weiznich / diesel_async Goto Github PK
View Code? Open in Web Editor NEWDiesel async connection implementation
License: Apache License 2.0
Diesel async connection implementation
License: Apache License 2.0
hello, i have defined a new trait by diesel_async:
`use std::marker::PhantomData;
use diesel::mysql::Mysql;
use diesel::pg::Pg;
use diesel::query_dsl::LoadQuery;
use diesel::sql_types::BigInt;
use diesel::{query_builder::*, QueryResult};
use diesel_async::RunQueryDsl;
use diesel_async::AsyncConnection;
pub trait Paginate: Sized {
fn paginate(self, offset: usize, limit: usize) -> PaginatedQuery<Self, DB> {
PaginatedQuery {
query: self,
offset: offset as i64,
limit: limit as i64,
_marker: PhantomData,
}
}
}
impl<T, DB> Paginate for T {}
#[derive(Debug)]
pub struct PaginatedQuery<T, Conn> {
query: T,
offset: i64,
limit: i64,
_marker: PhantomData,
}
impl<T, Conn> PaginatedQuery<T, Conn> {
pub fn load_and_total<'a, U>(self, conn: &mut Conn) -> QueryResult<(Vec, i64)>
where
Self: LoadQuery<'a, Conn, (U, i64)>,
{
let results = self.load::<(U, i64)>(conn)?;
let total = *results.get(0).map(|(, total)| total).unwrap_or(&0);
let records: Vec = results.into_iter().map(|(record, )| record).collect();
Ok((records, total))
}
}but the follow code reports an error.
impl<T, Conn> RunQueryDsl for PaginatedQuery<T, Conn> {}the error is "conflicting implementations of trait
diesel_async::RunQueryDsl<>for type
paginate_async::PaginatedQuery<, _>conflicting implementation in crate
diesel_async`:
serde_json
, chrono
postgres
, bb8
In my application I'm batch-inserting a couple of records at a time. I was looking into using diesel_async
to support a wider async refactoring of the application, but my previously working batch insert query now results in a compilation error, see compiler output spoiler below. I'm sure its a simple fault on my end but I couldn't find anything in regards to diesel_async
and batch inserts.
If I change above code to only insert a single element .execute(records[0])
, the code compiles just fine.
My interpretation of the compiler output is that the diesel_async
ExecuteDsl
is not implemented for batch insert statements. Either way, I'd be glad to contribute a documentation example of whatever the solution is.
Batch inserting via a bb8 Postgres connection.
Compilation succeeds.
error[E0277]: the trait bound InsertStatement<table, BatchInsert<'_, StashRecord, table>>: diesel::query_builder::query_id::QueryId
is not satisfied
--> crates/indexer/src/sinks/postgres.rs:43:22
|
43 | .execute(&mut conn)
| ------- ^^^^^^^^^ the trait diesel::query_builder::query_id::QueryId
is not implemented for InsertStatement<table, BatchInsert<'_, StashRecord, table>>
| |
| required by a bound introduced by this call
|
= help: the following other types implement trait diesel::query_builder::query_id::QueryId
:
&'a T
()
(T0, T1)
(T0, T1, T2)
(T0, T1, T2, T3)
(T0, T1, T2, T3, T4)
(T0, T1, T2, T3, T4, T5)
(T0, T1, T2, T3, T4, T5, T6)
and 221 others
= note: required for InsertStatement<table, BatchInsert<'_, StashRecord, table>>
to implement diesel_async::methods::ExecuteDsl<_, _>
note: required by a bound in diesel_async::RunQueryDsl::execute
--> /Users/maximumstock/.cargo/registry/src/github.com-1ecc6299db9ec823/diesel-async-0.2.2/src/run_query_dsl/mod.rs:221:15
|
221 | Self: methods::ExecuteDsl + 'query,
| ^^^^^^^^^^^^^^^^^^^^^^^^^ required by this bound in RunQueryDsl::execute
error[E0277]: the trait bound impl futures::Future<Output = Postgres>: sinks::sink::Sink
is not satisfied
--> crates/indexer/src/main.rs:187:24
|
187 | sinks.push(Box::new(Postgres::connect(url)));
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ the trait sinks::sink::Sink
is not implemented for impl futures::Future<Output = Postgres>
|
= help: the following other types implement trait sinks::sink::Sink
:
Postgres
RabbitMq<'a>
= note: required for the cast from impl futures::Future<Output = Postgres>
to the object type dyn sinks::sink::Sink
warning: unused import: diesel_async::methods::ExecuteDsl
--> crates/indexer/src/sinks/postgres.rs:4:5
|
4 | use diesel_async::methods::ExecuteDsl;
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Some errors have detailed explanations: E0195, E0277, E0412.
For more information about an error, try rustc --explain E0195
.
warning: indexer
(bin "indexer") generated 5 warnings
use diesel_async::{
pooled_connection::{bb8::Pool, AsyncDieselConnectionManager},
AsyncPgConnection, RunQueryDsl,
};
use crate::schema::stash_records::dsl::*;
#[derive(Serialize, Insertable, Queryable)]
#[table_name = "stash_records"]
pub struct StashRecord {
pub created_at: NaiveDateTime,
pub change_id: String,
pub next_change_id: String,
pub stash_id: String,
pub stash_type: String,
pub items: serde_json::Value,
pub public: bool,
pub account_name: Option<String>,
pub last_character_name: Option<String>,
pub stash_name: Option<String>,
pub league: Option<String>,
pub chunk_id: i64,
}
async fn handle(&self, records: &[StashRecord]) -> Result<usize, Box<dyn std::error::Error>> {
// inlined for readability
let pool = Pool::builder().build(config).await.expect("Postgres database connect");
let mut conn = pool.get().await?;
let query = diesel::insert_into(stash_records).values(records).into();
diesel::insert_into(stash_records)
.values(records)
.execute(&mut conn)
.await
.map_err(|e| e.into())
}
and my schema.rs
:
table! {
stash_records (created_at) {
created_at -> Timestamp,
change_id -> Text,
next_change_id -> Text,
stash_id -> Text,
stash_type -> Text,
items -> Jsonb,
public -> Bool,
account_name -> Nullable<Text>,
last_character_name -> Nullable<Text>,
stash_name -> Nullable<Text>,
league -> Nullable<Text>,
chunk_id -> Nullable<Int8>,
}
}
Here is a link to the commit in my repository.
The examples provided in the README and on crates.io are outdated and do not work.
Example
The code snipped
let mut connection = AsyncPgConnection::establish(std::env::var("DATABASE_URL")?).await?;
// use ordinary diesel query dsl to construct your query
let data: Vec<User> = users::table
.filter(users::id.gt(0))
.or_filter(users::name.like("%Luke"))
.select(User::as_select())
// execute the query via the provided
// async `diesel_async::RunQueryDsl`
.load(&mut connection)
.await?;
Will result in the following error
error[E0277]: the trait bound `User: FromSqlRow<_, Pg>` is not satisfied
--> src/main.rs:62:15
|
62 | .load(&mut connection)
| ---- ^^^^^^^^^^^^^^^ the trait `FromSqlRow<_, Pg>` is not implemented for `User`
| |
| required by a bound introduced by this call
Also the bb8 code snippet
let config = AsyncDieselConnectionManager::<diesel_async::AsyncPgConnection>::new(std::env::var("DATABASE_URL")?);
let pool = Pool::builder().build(config).await?;
// checkout a connection from the pool
let mut conn = pool.get().await?;
// use the connection as ordinary diesel-async connection
let res = users::table.select(User::as_select()).load::(&mut conn).await?;
Will result in the following error
error: field expressions cannot have generic arguments
--> src/main.rs:61:54
|
61 | let res = users::table.select(User::as_select()).load::(&mut conn).await?;
| ^^^^^^^^^^^^^^^^^
And even if we use the turbofish syntax load::<User>
the code will still output errors
--> src/main.rs:61:67
|
61 | let res = users::table.select(User::as_select()).load::<User>(&mut conn).await?;
| ---- ^^^^^^^^^ the trait `FromSqlRow<_, _>` is not implemented for `User`
| |
| required by a bound introduced by this call
|
Rust: rustc 1.66.0
Diesel: 2.0.3
Diesel_async: 0.3.1 (crate.io)
Database: PostgreSQL
Operating System: Ubuntu 22.04
diesel: ["r2d2", "uuid", "chrono"],
diesel_async: ["postgres"]
diesel_migrations: ["postgres"]
When I try to write raw query using sql_query function i get the following errors, used to work fine with diesel_async 0.2.2
error[E0277]: the trait bound Untyped: CompatibleType<models::entities::generated::Entity, Pg>
is not satisfied
--> src/models/entities/index.rs:33:41
|
33 | "#).bind::<Integer,_>(param_id).get_results::(db).await
| ^^^^^^^^^^^ the trait CompatibleType<models::entities::generated::Entity, Pg>
is not implemented for Untyped
|
= help: the trait CompatibleType<U, DB>
is implemented for Untyped
= note: required for query_builder::sql_query::UncheckedBind<diesel::query_builder::SqlQuery, &i32, diesel::sql_types::Integer>
to implement diesel_async::methods::LoadQuery<'_, _, models::entities::generated::Entity>
error[E0277]: the trait bound Untyped: CompatibleType<models::entities::generated::Entity, Pg>
is not satisfied
--> src/models/entities/index.rs:33:63
|
33 | "#).bind::<Integer,_>(param_id).get_results::(db).await
| ----------- ^^ the trait CompatibleType<models::entities::generated::Entity, Pg>
is not implemented for Untyped
| |
| required by a bound introduced by this call
|
= help: the trait CompatibleType<U, DB>
is implemented for Untyped
= note: required for query_builder::sql_query::UncheckedBind<diesel::query_builder::SqlQuery, &i32, diesel::sql_types::Integer>
to implement diesel_async::methods::LoadQuery<'_, _, models::entities::generated::Entity>
note: required by a bound in diesel_async::RunQueryDsl::get_results
--> /home/aman/.cargo/registry/src/github.com-1ecc6299db9ec823/diesel-async-0.3.1/src/run_query_dsl/mod.rs:563:15
|
563 | Self: methods::LoadQuery<'query, Conn, U> + 'query,
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ required by this bound in diesel_async::RunQueryDsl::get_results
No additional error is shown.
#[derive(Debug, Serialize, Deserialize, Clone, Queryable, Apiv2Schema, Insertable, AsChangeset, Identifiable, Selectable)]
#[diesel(table_name=entities, primary_key(id))]
pub struct Entity {
pub id: i32,
pub parent_entity: i32,
pub name: String,
pub icon: Option<String>,
pub created_at: chrono::NaiveDateTime,
}
impl Entity{
pub async fn read_chain<U>(db: &mut Connection, param_id: &i32)->QueryResult<Vec<Self>>
{
sql_query(r#"
WITH RECURSIVE subentities AS (
SELECT
*
FROM
entities
WHERE
id = $1
UNION
SELECT
e.*
FROM
entities e
INNER JOIN subentities s ON s.id = e.parent_entity
) SELECT
*
FROM
subentities;
"#).bind::<Integer,_>(param_id).get_results::<Entity>(db).await
}
}
I have already looked over the issue tracker for similar possible closed issues.
This issue can be reproduced on Rust's stable channel. (Your issue will be
closed if this is not the case)
The following reproducible sample triggers a compiler error:
[dependencies]
diesel = { version = "2.1.1", features = ["postgres"] }
diesel-async = { version = "0.4.1", features = ["postgres", "bb8"] }
use diesel::pg::Pg;
use diesel::result::Error;
use diesel_async::scoped_futures::ScopedFutureExt;
use diesel_async::{AsyncConnection, RunQueryDsl};
pub async fn demo<T: AsyncConnection<Backend = Pg>>(conn: &mut T) -> Result<(), Error> {
conn.transaction::<(), Error, _>(|conn| async {
diesel::sql_query("insert into posts (title, body) values ('title', 'body')").execute(conn).await?;
Ok(())
}.scope_boxed()).await?;
Ok(())
}
error[E0310]: the parameter type `T` may not live long enough
--> src/main.rs:7:45
|
7 | conn.transaction::<(), Error, _>(|conn| async {
| _____________________________________________^
8 | | diesel::sql_query("insert into posts (title, body) values ('title', 'body')").execute(conn).await?;
9 | | Ok(())
10 | | }.scope_boxed()).await?;
| |_______________________^ ...so that the type `T` will meet its required lifetime bounds
|
help: consider adding an explicit lifetime bound...
|
6 | pub async fn demo<T: AsyncConnection<Backend = Pg> + 'static>(conn: &mut T) -> Result<(), Error> {
| +++++++++
As far as I know, using .transaction()
and .execute()
in this way should not require a 'static
bound.
I ended up investigating myself, created a StackOverflow post about it when I thought I was stuck, and ended up self-answering it as an odd interaction with GATs and Self
-bounds.
While this isn't a major roadblock for using diesel-async (since workarounds exist) it would be nice to avoid this problem. The solution is to avoid where Self: 'conn
on the associated types of AsyncConnection
. This can be done (though a bit of a hack) via a dummy function as shown in this comment. I was able to modify diesel-async with this change and tested it myself. It does fix the above error and I didn't see any conflicts or downsides.
So let me know if that's worth a PR. Or if that doesn't sound worth it or would be problematic, that's fine too.
I am new on Rust and I try to implement Pagination with PostgreSQL and DeadPool by inspired from diesel_filter it works with simple type not complex query like sub query
This working and satisfied type .load_and_count::<(String, String)>(&mut conn)
let mut query = tbl_users.into_boxed();
if let Some(roleid) = user.role_id {
query = query.filter(crate::schema::tbl_users::role_id.nullable().eq(roleid));
}
let query_result = query
.filter(
crate::schema::tbl_users::deleted_at.is_null(),
)
.left_join(tbl_users_histories)
.select((
crate::schema::tbl_users::nickname,
crate::schema::tbl_users::login_id,
))
.paginate(Some(1))
.per_page(Some(2))
.load_and_count::<(String, String)>(&mut conn)
.await.map_err(|err| {
ServiceError::new(
StatusCode::INTERNAL_SERVER_ERROR,
false,
format!("DB show() users error: {:?}", err),
-1,
)
})?;
println!("User Paging:{:#?}", query_result);
todo!()
Not satisfied .load_and_count::<(HistoryInfo, String, String)>(&mut conn)
let mut query = tbl_users.into_boxed();
if let Some(roleid) = user.role_id {
query = query.filter(crate::schema::tbl_users::role_id.nullable().eq(roleid));
}
let query_result = query
.filter(
crate::schema::tbl_users::deleted_at.is_null(),
)
.left_join(tbl_users_histories)
.select((
HistoryInfo::as_select(),
crate::schema::tbl_users::nickname,
crate::schema::tbl_users::login_id,
))
.paginate(Some(1))
.per_page(Some(2))
.load_and_count::<(HistoryInfo, String, String)>(&mut conn).await.map_err(|err| {
ServiceError::new(
StatusCode::INTERNAL_SERVER_ERROR,
false,
format!("DB show() users error: {:?}", err),
-1,
)
})?;
println!("User Paging:{:#?}", query_result);
todo!()
#[derive(Debug, Serialize, Deserialize, Queryable, Selectable, Identifiable, Clone, PartialEq)]
#[diesel(table_name = tbl_users_histories)]
pub struct HistoryInfo {
pub id : i32,
pub history_desc : String,
}
Paginated
use diesel::{
pg::Pg, prelude::*, query_builder::*, sql_types::BigInt,
};
use diesel_async::pooled_connection::AsyncDieselConnectionManager;
use diesel_async::{RunQueryDsl, AsyncPgConnection};
use diesel_async::methods::LoadQuery;
use std::result::Result;
use deadpool::managed::Object;
pub use diesel_filter_query::*;
#[cfg(feature = "serialize")]
pub use serialize::*;
pub const DEFAULT_PER_PAGE: i64 = 10;
#[cfg(feature = "serialize")]
pub mod serialize {
use serde::Serialize;
#[derive(Serialize)]
pub struct PaginatedPayload<T>
where
T: Serialize,
{
pub data: Vec<T>,
total: i64,
}
impl<T> From<(Vec<T>, i64)> for PaginatedPayload<T>
where
T: Serialize,
{
fn from(data: (Vec<T>, i64)) -> Self {
Self {
data: data.0,
total: data.1,
}
}
}
}
pub trait Paginate: Sized {
fn paginate(self, page: Option<i64>) -> Paginated<Self>;
}
impl<T> Paginate for T {
fn paginate(self, page: Option<i64>) -> Paginated<Self> {
let page = page.unwrap_or(1);
Paginated {
query: self,
per_page: DEFAULT_PER_PAGE,
page: page,
offset: (page - 1) * DEFAULT_PER_PAGE,
}
}
}
#[derive(Debug, Clone, Copy, QueryId)]
pub struct Paginated<T> {
query: T,
page: i64,
offset: i64,
per_page: i64,
}
impl<T> Paginated<T> {
pub fn per_page(self, per_page: Option<i64>) -> Self {
let per_page = per_page.unwrap_or(DEFAULT_PER_PAGE);
Paginated {
per_page,
offset: (self.page - 1) * per_page,
..self
}
}
pub async fn load_and_count<'a, U>(
self,
conn: &mut Object<AsyncDieselConnectionManager<AsyncPgConnection>>,
) -> Result<(Vec<U>, i64), Box<dyn std::error::Error>>
where
Self: LoadQuery<'a, AsyncPgConnection, (U, i64)> + 'a,
U: std::marker::Send,
{
let per_page = self.per_page;
let results = self
.load::<(U, i64)>(conn)
.await
.map_err(|err| Box::new(err) as Box<dyn std::error::Error>)?;
let total = results.get(0).map(|x| x.1).unwrap_or(0);
let records = results.into_iter().map(|x| x.0).collect();
let total_pages = (total as f64 / per_page as f64).ceil() as i64;
Ok((records, total_pages))
}
}
impl<T: Query> Query for Paginated<T> {
type SqlType = (T::SqlType, BigInt);
}
//Comment to avoid connflict impl trait RunQueryDsl
// impl<T> RunQueryDsl<AsyncPgConnection> for Paginated<T> {}
impl<T> QueryFragment<Pg> for Paginated<T>
where
T: QueryFragment<Pg>,
{
fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> QueryResult<()> {
out.push_sql("SELECT *, COUNT(*) OVER () FROM (");
self.query.walk_ast(out.reborrow())?;
out.push_sql(") t LIMIT ");
out.push_bind_param::<BigInt, _>(&self.per_page)?;
out.push_sql(" OFFSET ");
out.push_bind_param::<BigInt, _>(&self.offset)?;
Ok(())
}
}
pub struct PaginationOptions {
pub per_page: i64,
pub page: i64,
}
Make function pub async fn load_and_count
with trait bound Self: LoadQuery<'a, AsyncPgConnection, (U, i64)> + 'a,
satisfied complex type
error[E0277]: the trait bound `((diesel::expression::select_by::SelectBy<HistoryInfo, Pg>, diesel::sql_types::Text, diesel::sql_types::Text), diesel::sql_types::BigInt): CompatibleType<((HistoryInfo, std::string::String, std::string::String), i64), Pg>` is not satisfied
--> src/repositories/user/user_repo.rs:141:10
|
141 | .load_and_count::<(HistoryInfo, String, String)>(&mut conn)
| ^^^^^^^^^^^^^^ the trait `CompatibleType<((HistoryInfo, std::string::String, std::string::String), i64), Pg>` is not implemented for `((diesel::expression::select_by::SelectBy<HistoryInfo, Pg>, diesel::sql_types::Text, diesel::sql_types::Text), diesel::sql_types::BigInt)`
|
= help: the following other types implement trait `CompatibleType<U, DB>`:
(ST0,)
(ST0, ST1)
(ST0, ST1, ST2)
(ST0, ST1, ST2, ST3)
(ST0, ST1, ST2, ST3, ST4)
(ST0, ST1, ST2, ST3, ST4, ST5)
(ST0, ST1, ST2, ST3, ST4, ST5, ST6)
(ST0, ST1, ST2, ST3, ST4, ST5, ST6, ST7)
and 24 others
= note: required for `Paginated<BoxedSelectStatement<'_, (SelectBy<HistoryInfo, Pg>, Text, Text), FromClause<JoinOn<..., ...>>, ...>>` to implement `diesel_async::methods::LoadQuery<'_, AsyncPgConnection, ((HistoryInfo, std::string::String, std::string::String), i64)>`
note: required by a bound in `Paginated::<T>::load_and_count`
--> /home/.../diesel_filter/core/src/pagination.rs:84:15
|
79 | pub async fn load_and_count<'a, U>(
| -------------- required by a bound in this associated function
...
84 | Self: LoadQuery<'a, AsyncPgConnection, (U, i64)> + 'a,
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ required by this bound in `Paginated::<T>::load_and_count`
No
Firstly, thank you for your work on this crate and on Diesel itself! They have been very nice to work with compared to using tokio_postgres directly.
I gather you're aware of this issue already (https://www.reddit.com/r/rust/comments/zn9ut0/announcing_dieselasync_020/), but as dangling transactions can cause deadlocks and data loss, I'm wondering whether it might be possible to improve the current behaviour, or at least document it more prominently if that is not possible.
As a quick-and-dirty solution, would something like the following make sense, where the connection is dropped from the pool (with implicit rollback) when it is recycled with an active transaction?
diff --git a/src/pg/mod.rs b/src/pg/mod.rs
index 6a4832b..b16a0a0 100644
--- a/src/pg/mod.rs
+++ b/src/pg/mod.rs
@@ -475,6 +475,15 @@ async fn lookup_type(
#[cfg(any(feature = "deadpool", feature = "bb8", feature = "mobc"))]
impl crate::pooled_connection::PoolableConnection for AsyncPgConnection {}
+impl crate::pooled_connection::PoolableConnection for AsyncPgConnection {
+ fn is_broken(&mut self) -> bool {
+ match self.transaction_state().status.transaction_depth() {
+ Ok(Some(_)) => true,
+ Ok(None) => false,
+ Err(_) => true,
+ }
+ }
+}
#[cfg(test)]
pub mod tests {
diff --git a/src/pooled_connection/mod.rs b/src/pooled_connection/mod.rs
index ae5abdf..9c35d06 100644
--- a/src/pooled_connection/mod.rs
+++ b/src/pooled_connection/mod.rs
@@ -258,7 +258,7 @@ pub trait PoolableConnection: AsyncConnection {
/// [ManageConnection::has_broken] for details.
///
/// The default implementation does not consider any connection as broken
- fn is_broken(&self) -> bool {
+ fn is_broken(&mut self) -> bool {
false
}
}
Alternatively, could an approach like tokio_postgres uses work instead?
https://github.com/sfackler/rust-postgres/pull/835/files?
Anything that causes a transaction future to be dropped before being polled to completion, eg the calling future terminating due to client disconnection, or things like timeouts
let mut pg = pool.get().await?;
tokio::time::timeout(std::time::Duration::from_secs(1),
pg.transaction::<_, Error, _>(|pg| async move {
sql_query("some sql").execute(pg).await?;
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
Ok(())
}.scope_boxed())
).await.unwrap_err();
Hello,
I would like to know if you plan to add the possibility of using sql_query in async?
Thank you in advance for your reply.
I'm unable to use .save_changes() on a structure, as the connection pool doesn't implement the valid "UpdateAndFetchResults" trait.
Use the method on a structure that derives AsChangeset and Identifiable to save changes.
Program to compile.
error[E0277]: the trait bound `deadpool::managed::Object<AsyncDieselConnectionManager<AsyncPgConnection>>: diesel_async::UpdateAndFetchResults<&Post, _>` is not satisfied
--> src/main.rs:57:31
|
57 | post.save_changes(&mut conn).await?;
| ------------ ^^^^^^^^^ the trait `diesel_async::UpdateAndFetchResults<&Post, _>` is not implemented for `deadpool::managed::Object<AsyncDieselConnectionManager<AsyncPgConnection>>`
| |
| required by a bound introduced by this call
|
= help: the trait `diesel_async::UpdateAndFetchResults<Changes, Output>` is implemented for `AsyncPgConnection`
note: required by a bound in `diesel_async::SaveChangesDsl::save_changes`
--> /Users/cooper/.cargo/registry/src/github.com-1ecc6299db9ec823/diesel-async-0.3.0/src/run_query_dsl/mod.rs:678:15
|
678 | Conn: UpdateAndFetchResults<Self, T>,
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ required by this bound in `SaveChangesDsl::save_changes`
No
use diesel::{ExpressionMethods, QueryDsl, SelectableHelper, update};
use diesel_async::pooled_connection::AsyncDieselConnectionManager;
use diesel_async::pooled_connection::deadpool::Pool;
use diesel_async::{RunQueryDsl, AsyncConnection, AsyncPgConnection, SaveChangesDsl};
#[derive(Debug, Queryable, Selectable, AsChangeset, Identifiable)]
#[diesel(table_name = crate::schema::posts)]
#[diesel(check_for_backend(diesel::pg::Pg))]
pub struct Post {
pub id: i32,
pub title: String,
pub body: String,
pub published: bool,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
dotenv().ok();
let config = AsyncDieselConnectionManager::<diesel_async::AsyncPgConnection>::new(std::env::var("DATABASE_URL")?);
let pool = Pool::builder(config).build()?;
let mut conn = pool.get().await?;
let results = posts
.filter(published.eq(true))
.order(id.desc())
.select(Post::as_select())
.load(&mut conn)
.await?;
for mut post in results {
post.body = format!("1-{}", post.body);
post.save_changes(&mut conn).await?;
}
Ok(())
}
diesel-async + AsyncMysqlConnection gives error after executing more than 16382 inserts, while diesel does not
Can't create more than max_prepared_stmt_count statements (current value: 16382)
Specifically, I used 100 threads/futures, each insert-select-delete 200 times. The thread pool was set max connection count to 30.
diesel-async gives the error, whilst disel(sync) does not and can finish as expected.
Can't create more than max_prepared_stmt_count statements (current value: 16382)
On my local machine they both reached about ~2k QPS. The concurrent connections was 30.
reproducible repo (with corresponding failing CI): https://github.com/gwy15/diesel-async-example
Since diesel + multithread does not have a problem, I expect diesel-async to have the same ability.
Can't create more than max_prepared_stmt_count statements
No
migration
CREATE TABLE `user` (
`id` bigint unsigned NOT NULL AUTO_INCREMENT,
`name` varchar(255) NOT NULL,
`email` varchar(255) NOT NULL,
`created_at` datetime(3) NOT NULL DEFAULT now(3),
`updated_at` datetime(3) NOT NULL DEFAULT now(3) ON UPDATE now(3),
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
main.rs
use diesel_async::{AsyncConnection, AsyncMysqlConnection, RunQueryDsl};
mod schema {
diesel::table! {
user (id) {
id -> Unsigned<Bigint>,
name -> Varchar,
email -> Varchar,
created_at -> Datetime,
updated_at -> Datetime,
}
}
}
#[derive(diesel::Insertable)]
#[diesel(table_name = schema::user)]
pub struct NewUser<'a> {
pub name: &'a str,
pub email: &'a str,
}
impl<'a> NewUser<'a> {
pub async fn insert(&self, conn: &mut AsyncMysqlConnection) -> anyhow::Result<()> {
diesel::insert_into(schema::user::dsl::user)
.values(self)
.execute(conn)
.await?;
Ok(())
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let url = dotenv::var("DATABASE_URL")?;
let mut conn = diesel_async::AsyncMysqlConnection::establish(&url).await?;
for i in 0..16382 + 10 {
NewUser {
name: &format!("name-{}", i),
email: &format!("email-{}", i),
}
.insert(&mut conn)
.await?;
if i % 100 == 0 {
println!("{}", i);
}
}
Ok(())
}
cargo.toml
[dependencies]
anyhow = "1.0.65"
diesel = { version = "2.0.0", features = ["mysql", "chrono"] }
diesel-async = { version = "0.1.0", features = ["bb8", "mysql", "tokio"], optional = true }
dotenv = "0.15.0"
tokio = { version = "1.21.2", features = ["full"] }
to run locally, please
git clone https://github.com/gwy15/diesel-async-example
cd diesel-async-example
docker-compose up -d
diesel migration run
cargo t --no-default-features --features async
I can't use MultiConnection with async Connection
I would like to use MultiConnection trait with async connection
No issue
the trait diesel::Connection is not implemented for diesel_async::AsyncMysqlConnection
#[derive(diesel::MultiConnection)]
pub enum InferConnection {
Pg(AsyncPgConnection),
Mysql(AsyncMysqlConnection),
}
RunQueryDsl overrides calls to vec.first to RunQueryDsl::first() this happens to all possible calls to first() if the trait is imported.
The diesel_async::RunQueryDsl
trait overrides the default behavior of any structs first()
function to RunQueryDsl::first causing a nasty compile time error.
I wish to use the vec.first() operation instead of vec.get(0). (Mainly becuase clippy errors, but this is a pretty hard bug to find).
Here is a gist that shows the issue at hand.
https://gist.github.com/fastfists/9d0f1da80f825be39f9a4499aa8990a7
hello, i have defined a new trait by diesel_async:
`use std::marker::PhantomData;
use diesel::mysql::Mysql;
use diesel::pg::Pg;
use diesel::query_dsl::LoadQuery;
use diesel::sql_types::BigInt;
use diesel::{query_builder::*, QueryResult};
use diesel_async::RunQueryDsl;
use diesel_async::AsyncConnection;
pub trait Paginate: Sized {
fn paginate(self, offset: usize, limit: usize) -> PaginatedQuery<Self, DB> {
PaginatedQuery {
query: self,
offset: offset as i64,
limit: limit as i64,
_marker: PhantomData,
}
}
}
impl<T, DB> Paginate for T {}
#[derive(Debug)]
pub struct PaginatedQuery<T, Conn> {
query: T,
offset: i64,
limit: i64,
_marker: PhantomData,
}
impl<T, Conn> PaginatedQuery<T, Conn> {
pub fn load_and_total<'a, U>(self, conn: &mut Conn) -> QueryResult<(Vec, i64)>
where
Self: LoadQuery<'a, Conn, (U, i64)>,
{
let results = self.load::<(U, i64)>(conn)?;
let total = *results.get(0).map(|(, total)| total).unwrap_or(&0);
let records: Vec = results.into_iter().map(|(record, )| record).collect();
Ok((records, total))
}
}but the follow code reports an error.
impl<T, Conn> RunQueryDsl for PaginatedQuery<T, Conn> {}`
the error is "conflicting implementations of traitdiesel_async::RunQueryDsl<>for typepaginate_async::PaginatedQuery<, _>conflicting implementation in cratediesel_async:
impl<T, Conn> diesel_async::RunQueryDsl for T;"
what should i do for this error? thank you.
Cannot created a pooled connection, and pass it to tonic. While trying to specify the type, rust analyzer throws an error.
I am trying to created a pooled diesel-async connection (any really, tried every example, now using the newest one on this github with rustls). Everythime i am trying to specify the type, compalier throws an error, that i am mssing some trait implementation.
https://github.com/mpiorowski/rusve/blob/diesel/service-notes/src/main.rs
I'm trying to get diesel-async + bb8 + postgres working, but its not able to convert the pooled connection -> AsyncPgConnection to work with diesel-async. I also can't find any unit tests for any of the connection pools.
Example:
use diesel_async::pooled_connection::bb8::Pool;
use diesel_async::pg::AsyncPgConnection;
pub type DbPool = Pool<AsyncPgConnection>;
pub async fn delete_for_community(
pool: &DbPool,
for_community_id: CommunityId,
) -> Result<usize, Error> {
let mut conn = pool.get().await.unwrap();
diesel::delete(community_moderator.filter(community_id.eq(for_community_id))).execute(&mut conn); // <--- Error here
}
Error:
diesel::delete(community_moderator.filter(community_id.eq(for_community_id))).execute(&mut conn); | ^^^^^^^ the trait
`Connection` is not implemented for `bb8::api::PooledConnection<'_, AsyncDieselConnectionManager<AsyncPgConnection>>` | =
help: the following other types implement trait `Connection`: PooledConnection<M> diesel::PgConnection
note: required because of the requirements on the impl of `diesel::query_dsl::methods::ExecuteDsl<bb8::api::PooledConnection<'_, AsyncDieselConnectionManager<AsyncPgConnection>>, Pg>` for `DeleteStatement<schema::community_moderator::table, query_builder::where_clause::WhereClause<diesel::expression::grouped::Grouped<diesel::expression::operators::Eq<schema::community_moderator::columns::community_id, diesel::expression::bound::Bound<diesel::sql_types::Integer, i32>>>>>` note: required by a bound in `diesel::RunQueryDsl::execute` --> /home/xxx/.cargo/registry/src/github.com-1ecc6299db9ec823/diesel-2.0.0/src/query_dsl/mod.rs:1401:15 | 1401 | Self: methods::ExecuteDsl<Conn>, | ^^^^^^^^^^^^^^^^^^^^^^^^^ required by this bound in `diesel::RunQueryDsl::execute`
After I upgrade the code from diesel-async 0.3.x to 0.4, I faced this error: Failed to find a type oid for ENUM_TYPE
.
After I downgrade, the error disappeared. The only change in the code base was diesel-async version.
postgres
postgres
When capturing data in the closure + future passed to AsyncConnection::transaction()
, lifetimes makes the code not compile.
The actual error given out of the box is the following:
error[E0597]: `user_id` does not live long enough
--> src/main.rs:30:66
|
30 | .transaction(|conn| Box::pin(async { users::table.find(*&user_id).first(conn).await }))
| ------ -------------------------------------^^^^^^^----------------------
| | | |
| | | borrowed value does not live long enough
| | returning this value requires that `user_id` is borrowed for `'static`
| value captured here
...
40 | }
| - `user_id` dropped here while still borrowed
The error is very confusing because, obviously, the closure and the future are consumed before the end of the main
function. And the 'static
lifetime makes no real sense.
I originally though the issue was due to automatic lifetimes assigned by rustc to the Future in the .transaction
function: give the same lifetime to the conn and to the future, so I tried to explicit them:
// extract of `src/transaction_manager.rs`
async fn transaction<'conn, 'fut, F, R, E>(conn: &'conn mut Conn, callback: F) -> Result<R, E>
where
F: FnOnce(&mut Conn) -> BoxFuture<'fut, Result<R, E>> + Send,
E: From<Error> + Send,
R: Send,
{
Self::begin_transaction(conn).await?;
With this, the compile error changes, but is as confusing as ever:
error: lifetime may not live long enough
--> src/main.rs:30:29
|
30 | .transaction(|conn| Box::pin(async { users::table.find(*&user_id).first(conn).await }))
| ----- ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ returning this value requires that `'1` must outlive `'2`
| | |
| | return type of closure is Pin<Box<(dyn futures::Future<Output = Result<User, diesel::result::Error>> + std::marker::Send + '2)>>
| has type `&'1 mut AsyncPgConnection`
At least this time the lifetimes involved make sense: '1
(i.e. the reborrowed conn) must outlive '2
(i.e. the future that uses said conn). But it makes no sense in this case that the future outlives the connection (because this is what the compiler effectively tells us, that it may outlive it).
The full code is below.
Many thanks, this is the last hurdle for me to adopt diesel 2 + diesel-async!
Running a transaction that borrows data from around it.
A binary.
A lifetime error I cannot wrap my head around.
main.rs
:
use anyhow::Result;
use diesel::{prelude::*, table, Identifiable, Queryable};
use diesel_async::{AsyncConnection, AsyncPgConnection, RunQueryDsl};
table! {
users(id) {
id -> Integer,
name -> Text,
}
}
#[derive(Queryable, Identifiable, Debug, Clone)]
#[diesel(table_name = users)]
struct User {
id: i32,
name: String,
}
#[tokio::main]
async fn main() -> Result<()> {
let mut conn = AsyncPgConnection::establish(&std::env::var("DATABASE_URL")?).await?;
let user_id = 4242;
// The *& is intended to make the closure capture a reference since i32 is Copy
let user: User = conn
.transaction(|conn| Box::pin(async { users::table.find(*&user_id).first(conn).await }))
.await?;
println!("Found user {user:?}");
Ok(())
}
Cargo.toml
:
[package]
name = "diesel-2-issue"
version = "0.1.0"
edition = "2021"
[dependencies]
anyhow = "1"
diesel = { version = "2", features = ["postgres"] }
diesel-async = { path = "../diesel_async", version = "0.1", features = ["postgres"] }
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }
sqlite is not supported
sqlite is nice for development & testing, when a fill DB is overkill
Hi.
I'm using the 0.2.0 version.
I have some tests to do, I pretend to activate the 'test transaction' mode with the AsyncPgConnection structure. The problem is that i have a Pool with an AsyncDieselConnection. Using 'pool.get().await.unwrap()' returns an Object that encapsulate the manager and then the pgconnection. So This is a question about how activate the 'test transactions' through this 'Object'?
Best regards!
I have the following types defined:
use diesel::{
pg::Pg,
query_builder::{AstPass, Query, QueryFragment},
r2d2::{ConnectionManager, PoolError, PooledConnection},
QueryResult, RunQueryDsl,
};
use diesel_async::{
AsyncPgConnection, async_connection_wrapper::AsyncConnectionWrapper,
};
use std::{cmp::min, sync::Arc};
pub type MyDbConnection = AsyncConnectionWrapper<AsyncPgConnection>;
pub type PgPool = diesel::r2d2::Pool<ConnectionManager<MyDbConnection>>;
pub type PgDbPool = Arc<PgPool>;
pub type PgPoolConnection = PooledConnection<ConnectionManager<MyDbConnection>>;
and this function to get a pool:
pub fn new_db_pool(database_url: &str) -> Result<PgDbPool, PoolError> {
let manager = ConnectionManager::<MyDbConnection>::new(database_url);
PgPool::builder().build(manager).map(Arc::new)
}
This all compiles fine.
When I try to use my PgPoolConnection
connection however, like this:
fn insert_to_db(
conn: &mut PgPoolConnection,
) {
let result = conn.build_transaction()
.read_write()
.run::<_, Error, _>(|pg_conn| {
insert_to_db_impl(
pg_conn,
¤t_ans_lookups,
&ans_lookups,
¤t_ans_primary_names,
&ans_primary_names,
¤t_ans_lookups_v2,
&ans_lookups_v2,
¤t_ans_primary_names_v2,
&ans_primary_names_v2,
)
...
I get this error:
no method named `build_transaction` found for mutable reference `&mut PooledConnection<ConnectionManager<AsyncConnectionWrapper<AsyncPgConnection, Tokio>>>` in the current scope
Given that this used to work when this code was just using the PgConnection from Diesel directly, and AsyncPgConnection itself implements build_transaction
, I can only assume the issue lies with AsyncConnectionWrapper
not exposing it properly.
Use Diesel without relying on libpq.
The code should compile an work happily (assuming I use spawn_blocking).
The compilation error above.
Nup. Though it's quite hard to figure out what's meant to be supported. This crate purportedly supports bb8 for example but it doesn't work for AsyncConnectionWrapper. r2d2 does (maybe, pending this fix) but it isn't documented that r2d2 is supported.
git clone [email protected]:aptos-labs/aptos-indexer-processors.git
git checkout bd8c9147c6db748678e3110b937cdd7eb819d574
cd rust
cargo build -p processor
OID lookup is broken – lookup_type
just returns an error vs 0.3.x where it called the lookup function.
This issue tracks the progress to relicense diesel_async
from AGPL 3.0 to MIT/Apache 2.0.
I merged contributions from the following people:
For @smklein, @Razican and @gwy15: Please comment on this issue whether you agree to relicense your code or not?
For @fluxxu, @johnchildren, @livingcorpse25, @treysidechain: I already checked your boxes above, as you agreed to transfer the ownership of your changes to me. Please comment if you disagree with this change, otherwise I assume your consent.
Hi @weiznich, would you mind creating a new release? The last release was already quite some time ago and since the latest release there have been some PostgreSQL related changes that I need in my project. Thanks in advance!
We have a rust server needs to connect to a postgres database. Locally this works when using docker-compose without issue. In testing environment, we are using kubernetes with the postgres database in its own pod.
(The production environment which we have not pushed to yet has the postgres db in RDS on AWS. As it is not working in the test server, we have not pushed to this yet obv!)
We have just swapped a python server to rust on a kubernetes cluster in a testing environment. The rust server uses diesel, diesel-async and deadpool to connect to the postgres database. This works locally when we are using docker-compose to spin up the database and everything can be connected locally. This also work in CI and passes all integrations tests.
The issue is when the server is deployed into a kubernetes cluster. The database is a pod in the cluster in namespace database, and the server is in production.
This is the code that runs the connection:
use std::env;
use diesel_async::pooled_connection::{deadpool::Pool, AsyncDieselConnectionManager};
use diesel_async::pg::AsyncPgConnection;
use once_cell::sync::Lazy;
/// The database connection pool.
pub static POOL: Lazy<Pool<AsyncPgConnection>> = Lazy::new(|| {
let connection_string = env::var("DB_URL").unwrap();
let config = AsyncDieselConnectionManager::<diesel_async::AsyncPgConnection>::new(connection_string);
Pool::builder(config).build().unwrap()
});
/// Gets a connection from the pool in an async manner.
#[macro_export]
macro_rules! get_connection {
() => {
POOL.get().await.map_err(|e| {
CustomError::new(
format!("Failed to get connection: {}", e),
CustomErrorStatus::Unknown
)
})
};
}
Connect the server to the database.
Connection to the database
The error is:
Failed to get connection: Error occurred while creating a new object: pg_hba.conf rejects connection for host <internal_url>, user <user> database <db>, no encryption
No but I have done the following:
I'm trying to persist a list of enums on an object. I'm getting a confusing error that I don't understand.
Use following code:
use std::io::Write;
use anyhow::Result;
use diesel::deserialize;
use diesel::deserialize::FromSql;
use diesel::deserialize::FromSqlRow;
use diesel::expression::AsExpression;
use diesel::pg::Pg;
use diesel::pg::PgValue;
use diesel::prelude::*;
use diesel::serialize;
use diesel::serialize::IsNull;
use diesel::serialize::Output;
use diesel::serialize::ToSql;
use diesel_async::{AsyncConnection, AsyncPgConnection, RunQueryDsl, SimpleAsyncConnection};
pub mod postgres {
pub mod schema {
pub mod sql_types {
#[derive(diesel::sql_types::SqlType)]
#[diesel(postgres_type(name = "action"))]
pub struct Action;
}
diesel::table! {
use diesel::sql_types::*;
use super::sql_types::Action;
directory_actions (id) {
id -> Int4,
credentials_id -> Int4,
#[max_length = 255]
directory_path -> Varchar,
actions -> Array<Nullable<Action>>,
}
}
}
}
#[derive(Debug, FromSqlRow, AsExpression, Clone)]
#[diesel(sql_type = crate::postgres::schema::sql_types::Action)]
pub enum Action {
Read,
Write,
Delete,
List,
Admin,
}
// Implement ToSql for the custom enum
impl ToSql<crate::postgres::schema::sql_types::Action, Pg> for Action {
fn to_sql<'b>(&'b self, out: &mut Output<'b, '_, Pg>) -> serialize::Result {
match *self {
Action::Read => out.write_all(b"read")?,
Action::Write => out.write_all(b"write")?,
Action::Delete => out.write_all(b"delete")?,
Action::List => out.write_all(b"list")?,
Action::Admin => out.write_all(b"admin")?,
}
Ok(IsNull::No)
}
}
// Implement FromSql for the custom enum
impl FromSql<crate::postgres::schema::sql_types::Action, Pg> for Action {
fn from_sql(bytes: PgValue) -> deserialize::Result<Self> {
match bytes.as_bytes() {
b"read" => Ok(Action::Read),
b"write" => Ok(Action::Write),
b"delete" => Ok(Action::Delete),
b"list" => Ok(Action::List),
b"admin" => Ok(Action::Admin),
_ => Err("Unrecognized enum variant".into()),
}
}
}
#[derive(Debug, Identifiable, Queryable, Selectable, Insertable)]
#[diesel(table_name = crate::postgres::schema::directory_actions)]
#[diesel(check_for_backend(diesel::pg::Pg))]
pub struct DirectoryActions {
pub id: i32,
pub credentials_id: i32,
pub directory_path: String,
pub actions: Vec<Option<Action>>,
}
#[tokio::main]
async fn main() -> Result<()> {
let mut conn =
AsyncPgConnection::establish("postgres://localhost/diesel_test")
.await
.unwrap();
conn.begin_test_transaction().await.unwrap();
conn.batch_execute(
"CREATE TYPE Action AS ENUM ('read', 'write', 'delete', 'list', 'admin');
CREATE TABLE
directory_actions (
id SERIAL PRIMARY KEY,
credentials_id INTEGER NOT NULL,
directory_path VARCHAR(255) NOT NULL,
actions Action[] NOT NULL check (actions <> '{}' and array_position(actions, null) is null)
);",
)
.await
.unwrap();
let directory_actions = vec![DirectoryActions {
id: 1,
credentials_id: 1,
directory_path: "".into(),
actions: vec![Some(Action::Read)],
}];
diesel::insert_into(crate::postgres::schema::directory_actions::table)
.values(&directory_actions)
.execute(&mut conn)
.await
.unwrap();
let res = crate::postgres::schema::directory_actions::table
.select(DirectoryActions::as_select())
.load(&mut conn)
.await
.unwrap();
dbg!(res);
Ok(())
}
Cargo.toml:
[dependencies]
tokio = { version = "1.32.0", features = ["full"] }
diesel = { version = "2.1.3", features = ["postgres", "chrono"] }
diesel-async = { version = "0.4.1", features = [
"postgres",
"deadpool",
"async-connection-wrapper",
"tokio"
] }
anyhow = "1.0"
Persist a list of enums
No error
thread 'main' panicked at src/main.rs:122:10:
called Result::unwrap()
on an Err
value: SerializationError(FailedToLookupTypeError(PgMetadataCacheKey { schema: None, type_name: "action" }))
note: run with RUST_BACKTRACE=1
environment variable to display a backtrace
No
["mysql_backend", "serde_json", "uuid"]
["mysql", "mobc"]
having such a table:
diesel::table! {
use diesel::sql_types::*;
use super::sql_types::StateMapping;
accounts (address, workchain) {
workchain -> Tinyint,
#[max_length = 32]
address -> Varbinary,
...
}
}
this query
#[derive(Queryable, Selectable)]
#[diesel(check_for_backend(Mysql))]
#[diesel(table_name = crate::schema::accounts)]
pub struct AccountWithCreatorInfo {
...
}
let query = accounts_dsl::accounts
.filter(accounts_dsl::workchain.eq(-1))
.select(AccountWithCreatorInfo::as_select());
produces query like this:
"SELECT ... FROM `accounts` WHERE (`accounts`.`workchain` = ?)",
binds: [
-1,
],
But here we receive values as Vec<u8>
,
take the first element and cast them as i64
. Casting u8::MAX as i64
returns 255
instead of original -1
bind.
I suggest changing this line to
MysqlType::Tiny => Value::Int((bind[0] as i8) as i64),
hi, I just wanted to ask this question early, I found in https://github.com/weiznich/diesel_async/blob/main/src/mysql/mod.rs#L55
and https://github.com/ Weiznich/diesel_async/blob/main/src/mysql/mod.rs#L142
The time zone of each connection is set in the code, which for countries that do not use UTC need to manually change the time zone of the current link after each get the link, if These operations can be simplified by providing a time zone configuration item.
["postgres_backend", "chrono", "uuid", "numeric", "ipnet-address", "serde_json" ]
, default features inactive["postgres"]
["postgres"]
Not really a problem per se, but a missing feature. I would like to not depend on diesel_cli
to manage migrations, so I was using diesel_migrations
to handle this, but the MigrationHarness
needed to retrieve the applied migrations requires the asynchronous connection to implement MigrationConnection
, which makes sure that the DB has the correct table.
Unfortunately, this is not yet implemented for asynchronous connections.
I'm trying to embed the migrations with my code, and even create the DB if needed, so that on startup, it will check that the DB is at the correct migration.
I would expect to be able to do this with an asynchronous connection, the same way as with a synchronous connection.
error[E0277]: the trait bound `AsyncPgConnection: MigrationConnection` is not satisfied
--> backend/src/lib.rs:152:20
|
152 | run_migrations(&mut conn).expect("couldn't run migrations");
| -------------- ^^^^^^^^^ the trait `MigrationConnection` is not implemented for `AsyncPgConnection`
| |
| required by a bound introduced by this call
|
= help: the trait `MigrationConnection` is implemented for `PgConnection`
= note: required because of the requirements on the impl of `MigrationHarness<Pg>` for `AsyncPgConnection`
note: required by a bound in `update_database::{closure#0}::run_migrations`
--> backend/src/lib.rs:123:31
|
122 | fn run_migrations(
| -------------- required by a bound in this
123 | connection: &mut impl MigrationHarness<Pg>,
| ^^^^^^^^^^^^^^^^^^^^ required by this bound in `update_database::{closure#0}::run_migrations`
error[E0277]: the trait bound `AsyncPgConnection: LoadConnection` is not satisfied
--> backend/src/lib.rs:152:20
|
152 | run_migrations(&mut conn).expect("couldn't run migrations");
| -------------- ^^^^^^^^^ the trait `LoadConnection` is not implemented for `AsyncPgConnection`
| |
| required by a bound introduced by this call
|
= help: the trait `LoadConnection<B>` is implemented for `PgConnection`
= note: required because of the requirements on the impl of `diesel::query_dsl::LoadQuery<'_, AsyncPgConnection, MigrationVersion<'static>>` for `SelectStatement<FromClause<diesel_migrations::migration_harness::__diesel_schema_migrations::table>, query_builder::select_clause::SelectClause<diesel_migrations::migration_harness::__diesel_schema_migrations::columns::version>, query_builder::distinct_clause::NoDistinctClause, query_builder::where_clause::NoWhereClause, query_builder::order_clause::OrderClause<diesel::expression::operators::Desc<diesel_migrations::migration_harness::__diesel_schema_migrations::columns::version>>>`
= note: required because of the requirements on the impl of `MigrationHarness<Pg>` for `AsyncPgConnection`
note: required by a bound in `update_database::{closure#0}::run_migrations`
--> backend/src/lib.rs:123:31
|
122 | fn run_migrations(
| -------------- required by a bound in this
123 | connection: &mut impl MigrationHarness<Pg>,
| ^^^^^^^^^^^^^^^^^^^^ required by this bound in `update_database::{closure#0}::run_migrations`
No additional error is shown.
/// Embedded DB migrations.
const MIGRATIONS: EmbeddedMigrations = embed_migrations!("../migrations");
/// Runs the migrations in the database.
#[allow(trivial_casts)]
fn run_migrations(
connection: &mut impl MigrationHarness<Pg>,
) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
let migrations = (&MIGRATIONS as &dyn MigrationSource<Pg>).migrations()?;
let last = migrations
.last()
.expect("could not find the last migration");
if let Some(last_applied) = connection.applied_migrations()?.first() {
if last_applied > &last.name().version() {
// TODO: downgrade, probably add a confirmation parameter in the executable.
// But how? We would need to get the migrations for downgrade, but we don't have them.
panic!("Not possible to downgrade the database");
}
}
let _ = connection.run_pending_migrations(MIGRATIONS)?;
Ok(())
}
let mut conn = match db::Connection::establish(db_cfg.conn_str()).await {
Err(e) => {
eprintln!("WARNING: error connecting to {}", db_cfg.conn_str());
db::create(db_cfg)
.await
.expect("could not create the database")
}
Ok(c) => c,
};
I can work on this, but, should I add a feature to add this migrations connection implementation, so that it's not active by default for those who don't want to use migrations?
Also, I see that the diesel_migrations
trait expects the connection to run the SQL synchronously, so this might be trickier than expected.
I'm sorry in advance if this issue is badly reported, I'm a bit new to this...
Note: In this issue, MariaDB is used to describe both MariaDB & MySQL, as I think both are affected, but I only tested MariaDB.
In current Diesel version, a MySQL/MariaDB connection will see a flag activated to change the way UPDATE operations work:
// this is not present in the database_url, using a default value
let client_flags = CapabilityFlags::CLIENT_FOUND_ROWS;
On PostgreSQL, UPDATE operations will return the number of lines matched by the operation, even if the values set are identical to the ones already present in a row.
On MariaDB/MySQL, only the number of changed rows is reported by default. (Relevant StackOverflow answer: https://stackoverflow.com/a/2186952; Extract from MariaDB manual: https://dev.mysql.com/doc/c-api/8.3/en/mysql-affected-rows.html )
On Diesel, to get a consistent behavior between both engines, it seems like the flag to enable reporting of matching rows rather than modified rows is set on by default.
As Diesel-async seems to use an other way to parse the database URL, the same option isn't set by default and must be specified in the URL.
If we (as I got helped by someone to read diesel-async codebase) got the code right, diesel-async uses mysql-async, which supports setting this option:
https://docs.rs/mysql_async/0.32.2/mysql_async/struct.Opts.html#method.client_found_rows
I expected that the option would also be set in diesel-async to get a coherent and consistent behavior when switching between Diesel and Diesel-async.
In my case, I had tests which relied on reporting affected rows, which had to be worked-around when implementing support for MariaDB.
It would be nice for either diesel-async to also enable that setting by default, or for diesel to actually implement similar parsing of URLs to allow enabling/disabling the option using the URL.
UPDATE commands should either return changed rows count, or matching rows count, similarly regardless of if you use diesel or diesel-async
UPDATE commands only report changed rows in diesel-async, while it reports matching rows in diesel
No
I don't have a MWE for now, but it shouldn't be too hard to produce one if needed
["mysql", "64-column-tables"]
["mysql", "bb8", "tokio"]
use diesel_async::RunQueryDsl;
diesel::sql_query("select version();")
.load::<String>(&mut conn_result)
does not compile.
I wish to run a raw query to see if diesel_async is working
The code should compile?
error[E0277]: the trait bound `Untyped: CompatibleType<std::string::String, Mysql>` is not satisfied
--> examples/channel.rs:16:10
|
16 | .load::<String>(&mut conn_result)
| ^^^^ the trait `CompatibleType<std::string::String, Mysql>` is not implemented for `Untyped`
|
= help: the trait `CompatibleType<U, DB>` is implemented for `Untyped`
= note: required for `SqlQuery` to implement `diesel_async::methods::LoadQuery<'_, _, std::string::String>`
error[E0277]: the trait bound `Untyped: CompatibleType<std::string::String, Mysql>` is not satisfied
--> examples/channel.rs:16:25
|
16 | .load::<String>(&mut conn_result)
| ---- ^^^^^^^^^^^^^^^^ the trait `CompatibleType<std::string::String, Mysql>` is not implemented for `Untyped`
| |
| required by a bound introduced by this call
|
= help: the trait `CompatibleType<U, DB>` is implemented for `Untyped`
= note: required for `SqlQuery` to implement `diesel_async::methods::LoadQuery<'_, _, std::string::String>`
note: required by a bound in `diesel_async::RunQueryDsl::load`
--> /home/xiaoqilin/.cargo/registry/src/rsproxy.cn-8f6827c7555bfaf8/diesel-async-0.2.2/src/run_query_dsl/mod.rs:330:15
|
330 | Self: methods::LoadQuery<'query, Conn, U> + 'query,
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ required by this bound in `RunQueryDsl::load`
rustc 1.72.0
["postgres", "r2d2"]
["postgres", "bb8"]
Fail to get the connection, TimedOut
Manager the connection with the diesel-async
thread 'pg_async::tests::test_init_db' panicked at 'could not get connection: TimedOut', crates/srv-storage/src/pg_async.rs:39:41
stack backtrace:
0: rust_begin_unwind
at /rustc/065a1f5df9c2f1d93269e4d25a2acabbddb0db8d/library/std/src/panicking.rs:593:5
1: core::panicking::panic_fmt
at /rustc/065a1f5df9c2f1d93269e4d25a2acabbddb0db8d/library/core/src/panicking.rs:67:14
2: core::result::unwrap_failed
at /rustc/065a1f5df9c2f1d93269e4d25a2acabbddb0db8d/library/core/src/result.rs:1651:5
3: core::result::Result<T,E>::expect
at /rustc/065a1f5df9c2f1d93269e4d25a2acabbddb0db8d/library/core/src/result.rs:1033:23
4: srv_storage::pg_async::tests::test_init_db::{{closure}}
at ./src/pg_async.rs:39:24
No additional error is shown.
use diesel_async::{
pooled_connection::{bb8::Pool, bb8::PooledConnection, AsyncDieselConnectionManager},
AsyncPgConnection,
};
pub type DbConnectionManger = AsyncDieselConnectionManager<AsyncPgConnection>;
pub type DbPool = Pool<AsyncPgConnection>;
pub type DbConnection<'a> = PooledConnection<'a, AsyncDieselConnectionManager<AsyncPgConnection>>;
#[tracing::instrument()]
pub async fn init_db(database_url: &str) -> DbPool {
let mgr = AsyncDieselConnectionManager::<AsyncPgConnection>::new(database_url);
Pool::builder()
.build(mgr)
.await
.expect("could not build connection pool")
}
#[cfg(test)]
mod tests {
use super::init_db;
use diesel::{prelude::*, sql_query, sql_types::Text};
use diesel_async::RunQueryDsl;
#[derive(QueryableByName)]
struct SqlVersion {
#[diesel(sql_type = Text)]
pub version: String,
}
#[tokio::main]
#[test]
async fn test_init_db() {
dotenvy::dotenv().ok();
let database_url = std::env::var("DATABASE_URL").expect("Expected DATABASE_URL to be set");
let pool = init_db(database_url.as_str()).await;
let mut conn = pool.get().await.expect("could not get connection");
let version = sql_query("SELECT version()")
.get_result::<SqlVersion>(&mut conn)
.await;
assert!(version.is_ok());
let version = version.unwrap();
println!("database version {}", version.version);
}
}
diesel = { version = "2.1.0", features = ["postgres", "r2d2"] }
diesel-async = { version = "0.3.1", features = ["postgres", "bb8"] }
I'm using meilisearch and postgres for an API using axum and when I use meilisearch with the code:
let results = meilisearch_client
.index("programs")
.search()
.with_query(&query)
.with_limit(pagination.limit.into())
.execute()
.await?;
I have and error:
error[E0277]: the trait bound `&mut SearchQuery<'_>: QueryFragment<_>` is not satisfied
--> src/controllers/programs.rs:66:18
|
66 | .execute()
| ^^^^^^^ the trait `QueryFragment<_>` is not implemented for `&mut SearchQuery<'_>`
|
= help: the following other types implement trait `QueryFragment<DB, SP>`:
<diesel_async::pg::TransactionBuilder<'a, C> as QueryFragment<Pg>>
<Box<T> as QueryFragment<DB>>
<DeleteStatement<T, U, Ret> as QueryFragment<DB>>
<FromClause<F> as QueryFragment<DB>>
<BoxedLimitOffsetClause<'a, Pg> as QueryFragment<Pg>>
<query_builder::select_clause::DefaultSelectClause<QS> as QueryFragment<DB>>
<BoxedSqlQuery<'_, DB, Query> as QueryFragment<DB>>
<query_builder::where_clause::BoxedWhereClause<'a, DB> as QueryFragment<DB>>
and 384 others
= note: required for `&mut SearchQuery<'_>` to implement `diesel_async::methods::ExecuteDsl<_, _>`
note: required by a bound in `diesel_async::RunQueryDsl::execute`
--> /home/user/.local/share/cargo/registry/src/index.crates.io-6f17d22bba15001f/diesel-async-0.4.1/src/run_query_dsl/mod.rs:222:15
|
219 | fn execute<'conn, 'query>(self, conn: &'conn mut Conn) -> Conn::ExecuteFuture<'conn, 'query>
| ------- required by a bound in this associated function
...
222 | Self: methods::ExecuteDsl<Conn> + 'query,
| ^^^^^^^^^^^^^^^^^^^^^^^^^ required by this bound in `RunQueryDsl::execute`
The compiler things that the execute() of meilisearch is the execute() from diesel_async::RunQueryDsl.
Execute the meilisearch sdk query.
I'm using meilisearch-sdk = "0.24.2"
With these imports:
use diesel::{
pg::Pg,
query_builder::{AstPass, Query, QueryFragment},
r2d2::{ConnectionManager, PoolError, PooledConnection},
QueryResult, RunQueryDsl,
};
use diesel_async::{AsyncPgConnection};
I have these types:
pub type PgPool = diesel::r2d2::Pool<ConnectionManager<MyDbConnection>>;
pub type PgDbPool = Arc<PgPool>;
pub type MyDbConnection = AsyncPgConnection;
pub type PgPoolConnection = PooledConnection<ConnectionManager<MyDbConnection>>;
Unfortunately, this function results in a compilation error:
pub fn new_db_pool(database_url: &str) -> Result<PgDbPool, PoolError> {
let manager = ConnectionManager::<MyDbConnection>::new(database_url);
PgPool::builder().build(manager).map(Arc::new)
}
The error:
the trait bound `AsyncPgConnection: R2D2Connection` is not satisfied
With this type instead:
pub type PgPool = diesel::r2d2::Pool<AsyncDieselConnectionManager<MyDbConnection>>;
You get this error:
the trait bound `AsyncDieselConnectionManager<AsyncPgConnection>: ManageConnection` is not satisfied
Use r2d2 with diesel-async. There is an r2d2 feature so you'd think it would work but it doesn't seem to. I imagine we need an r2d2 entry in src/pool_connection/.
The code should compile an work happily.
The compilation error above.
N/A
git clone [email protected]:aptos-labs/aptos-indexer-processors.git
git checkout c8188670b64b3733126ac4d0f6fa54bbffb655d1
cd rust
cargo build -p processor
When using Diesel's sync Pg implementation, I can connect via TLS, but when using the bb8 pooled connections via diesel-async I cannot. I tracked this down to diesel-async always passing NoTls
when creating a tokio-postgres connection.
I want to connect to a PostgreSQL database using TLS
I'd expect this to work
I cannot connect
The error I see is that I have to connect over TLS
Simply try to connect to a pgsql service which requires tls using a bb8 pooled diesel-async connection (frankly a non-pooled connection would exhibit the same issue)
If you don't have such a server, you can get a free account on bit.io
I'd hope for a postgres-openssl and postgres-nativetls pair of features for diesel-async which enable the relevant TLS implementation for the connections. In the meantime I just have a hacky pile of code I'm not proud of:
pub type Pool = bb8::Pool<AsyncPgConnection>;
fn establish_connection(url: &str) -> BoxFuture<ConnectionResult<AsyncPgConnection>> {
(async {
let builder = SslConnector::builder(SslMethod::tls())
.map_err(|e| ConnectionError::BadConnection(e.to_string()))?;
let connector = MakeTlsConnector::new(builder.build());
let (client, connection) = tokio_postgres::connect(url, connector)
.await
.map_err(|e| ConnectionError::BadConnection(e.to_string()))?;
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
}
});
AsyncPgConnection::try_from(client).await
})
.boxed()
}
pub async fn create_pool(db_url: &str) -> Result<Pool, PoolError> {
let config = AsyncDieselConnectionManager::<AsyncPgConnection>::new_with_setup(
db_url,
establish_connection,
);
bb8::Pool::builder().build(config).await
}
When trying to connect to my database using AsyncPgConnection
I get a SSL error but it works using PgConnection
My postgres URL is just a simple username/password postgres:// url with no additional query params.
Cargo.toml
[dependencies]
diesel = { version = "2.0.3", features = ["postgres", "r2d2", "extras", "chrono", "uuid"] }
diesel-async = { version = "0.2.1", features = ["postgres", "bb8"] }
dotenv = "0.15.0"
serde = { version = "1.0.159", features = ["derive"]}
serde_json = "1.0.95"
chrono = { version = "0.4.24", features = ["serde"] }
bb8 = "0.8"
axum = "0.6.12"
axum-macros = "0.3.7"
tokio = { version = "1.27.0", features = ["full"] }
tracing = "0.1.37"
tracing-subscriber = { version = "0.3.16", features = ["env-filter"] }
Using PgConnection
:
#[tokio::main]
async fn main() {
dotenv().ok();
let database_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set");
let mut connection = PgConnection::establish(&database_url).unwrap();
println!("Established!");
let orgs = orgs::table.load::<Org>(&mut connection).unwrap();
println!("{:?}", orgs);
}
Output:
Established!
[]
Using AsyncPgConnection
:
#[tokio::main]
async fn main() {
dotenv().ok();
let database_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set");
let mut connection = AsyncPgConnection::establish(&database_url).await.unwrap();
println!("Established!");
let orgs = orgs::table.load::<Org>(&mut connection).await.unwrap();
println!("{:?}", orgs);
Output
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: CouldntSetupConfiguration(DatabaseError(Unknown, "SSL/TLS required"))', src\main.rs:27:76
Hi,
I am sorry about not using your template. It is just question. Do you have any vision on support connections pooling here? Any plans? Any interest to contribution?
Some query types (haven't researched an exhaustive list) will not currently be usable with diesel_async as upstream diesel keeps their implementation behind the backend features. For example:
#[cfg(feature = "mysql")]
impl QueryFragment<crate::mysql::Mysql> for InsertOrIgnore {
fn walk_ast<'b>(
&'b self,
mut out: AstPass<'_, 'b, crate::mysql::Mysql>,
) -> QueryResult<()> {
out.push_sql("INSERT IGNORE");
Ok(())
}
}
As I do not have the mysql
flag on diesel enabled I cannot use insert_or_ignore_into
.
I am not sure what the right solution for this is really. I specifically cannot turn on mysql
on diesel as I explicitly want to avoid the native dependencies it requires. Potentially a separate 'MySQL syntax but not backend' flag on diesel would solve this – albeit it's not the most elegant.
I am attempting to migrate older sync diesel code, using insert_or_ignore_into
, to diesel_async. The same query fails to compile when async, while building fine in standard diesel.
diesel::insert_or_ignore_into(users::table)
.values((users::id.eq(1), users::name.eq("Jim")))
.execute(&mut connection)
.await?;
The query compiles and works as intended.
the trait `QueryFragment<Mysql>` is not implemented for `query_builder::insert_statement::private::InsertOrIgnore`
use diesel::prelude::*;
use diesel_async::{RunQueryDsl, AsyncConnection, AsyncMysqlConnection};
table! {
users {
id -> Integer,
name -> Text,
}
}
// create an async connection
let mut connection = AsyncMysqlConnection::establish(std::env::var("DATABASE_URL")?).await?;
diesel::insert_or_ignore_into(users::table)
.values((users::id.eq(1), users::name.eq("Jim")))
.execute(&mut connection)
.await?;
error[E0433]: failed to resolve: could not find test
in tokio
--> /Users/vimmerru/.cargo/git/checkouts/diesel_async-c976f3a13066b495/0917245/src/pg/transaction_builder.rs:366:10
After running for a few days, some connections in the connection pool will return an error
Query records in the database
No error
Got an error: "Unexpected end of row"
Input/output error: can't parse: buf doesn't have enough data
It's hard to reproduce the problem, after running for a few days, some connections in the pool will return errors. I think those connections are already corrupt but still can pass the health check of the connection pool. So I did a contrast experiment, when I found the connection got corrupt, the ping2
responded with ok, but the ping
responded with an error "buf doesn't have enough data"
use diesel_async::{pooled_connection::deadpool::Pool, AsyncMysqlConnection};
type MysqlPool = Pool<AsyncMysqlConnection>;
pub struct AppState {
pub mysql_pool: MysqlPool,
}
async fn ping(State(state): State<Arc<AppState>>) -> Result<impl IntoResponse> {
let mut conn = state.mysql_pool.get().await?;
let x: i32 = diesel::select(1_i32.into_sql::<diesel::sql_types::Integer>())
.first(&mut conn)
.await?;
Ok(Json(x))
}
async fn ping2(State(state): State<Arc<AppState>>) -> Result<impl IntoResponse> {
let mut conn = state.mysql_pool.get().await?;
let _ = diesel::select(1_i32.into_sql::<diesel::sql_types::Integer>())
.execute(&mut conn)
.await?;
Ok(Json(())
}
Hi, are there any benchmarks for how fast is this compared to the normal diesel crate?
rustc 1.73.0-nightly (993deaa0b 2023-07-11)
0.3.1
postgres
Mac M1
postgres
Using the !table {...}
macro to specify the table and it's columns fails when the column names include an underscore. For example this fails -
table! {
customer {
customer_id -> Integer,
name -> Text,
}
}
but this works -
table! {
customer {
id -> Integer,
name -> Text,
}
}
The error it throws is
error[E0412]: cannot find type `id` in this scope
--> src/main.rs:14:1
|
14 | / table! {
15 | | customer {
16 | | customer_id -> Integer,
17 | | name -> Text,
18 | | }
19 | | }
| |_^ help: a builtin type with a similar name exists: `i8`
|
= note: this error originates in the macro `table` (in Nightly builds, run with -Z macro-backtrace for more info)
error[E0425]: cannot find value `id` in this scope
--> src/main.rs:14:1
|
14 | / table! {
15 | | customer {
16 | | customer_id -> Integer,
17 | | name -> Text,
18 | | }
19 | | }
| |_^ not found in this scope
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.