krojew / cdrs-tokio Goto Github PK
View Code? Open in Web Editor NEWHigh-level async Cassandra client written in 100% Rust.
License: Apache License 2.0
High-level async Cassandra client written in 100% Rust.
License: Apache License 2.0
A builder will be more useful than a ton of creation functions.
Since 8.1.0 (it was working fine in 8.0.0), exec_with_params is returning ErrorType::Unprepared
in some circumstances.
This should not occur because it is supposed to be handled here:
cdrs-tokio/cdrs-tokio/src/cluster/session.rs
Line 299 in e38bcc2
edit:
I think the cause would have to involve the attempted retry also hitting an unprepared query error.
Its possible this is the fault of our proxy but afaik the proxy should send the new prepare to all nodes in the cluster, so the second query failing should not be possible.
But the issue was definitely introduced by something in 8.1.0 ...
I will investigate further.
for row in rows {
for col in &cols.metadata.col_specs {
let value = match col.clone().col_type.id {
ColType::Varchar | ColType::Ascii => {
let val: String = row.get_by_name(col.name.as_str()).unwrap().expect("");
println!("{:#?}", val)
}
ColType::Map => {
let map_value: Map = row.get_by_name(col.name.as_str()).unwrap().expect("");
}
_ => {}
};
}
}
is anyone have idea how to convert the Map/List to serde_json::Value? assume the struct is unknown.
I have a static string reference in my struct, when I try to compile this piece of code the rust compiler errors with stackoverflow. If having a reference is not supported it should give warning or error instead of expanding the macro to use a code that makes rustc crash.
#[derive(IntoCdrsValue, TryFromRow, Clone, Debug, Serialize, Deserialize)]
pub struct EventData {
pub task_id: String,
pub agent: String,
pub event_name: &'static str,
pub event_time: u32,
pub interaction_id: String,
}
When I convert the above event_name
to a owned type such as String, it is able to complete the code.
rustc 1.68.0-nightly (270c94e48 2022-12-28)
async-fn-in-trait
is enabled.
M1 Pro, ARM processor
Hello there!
Prepared statement creating only in single node scope. With fixed nodes observablilty and enabled load balancing it become cause of query error:
ERROR cdrs_tokio::transport: Transport error! error=Server error: ErrorBody { error_code: 9472, message: "Prepared query with ID dde1cb1db179403026499e16ff7ef1f5 not found (either the query was not prepared on this host (maybe the host has been restarted?) or you have prepared too many queries and it has been evicted from the internal cache)", additional_info:
I recently came across https://github.com/scylladb/scylla-rust-driver
And also saw https://www.reddit.com/r/rust/comments/pfuwhf/help_wanted_cdrstokio_road_to_performance_and/
So it seems you have seen scylla-rust-driver and that spurred a lot of the cleanup of cdrs's performance mistakes.
After evaluating the performance of scylla-rust-driver it still seems to considerably outperform master cdrs-tokio, in both full scale usage and protocol serializing/deserializing.
But you clearly seem dedicated to cdrs-tokio instead of jumping over to scylla-rust-driver, so what do you think cdrs-tokio does better than scylla-rust-driver in order for you to keep working on it?
The NodeTcpConfig
struct holds a &str
instead of owning a String.
This forbid writing the following trivial code:
let nodes: Vec<NodeTcpConfig<'_>> = service_discovery.services("cassandra").iter()
.map(|srv| {
let address = format!("{}:{}", srv.host, srv.port);
NodeTcpConfigBuilder::new(&address, auth.clone()).build()
// ^ returns a value referencing data owned by the current function
})
.collect();
A workaround is to collect all the addresses and then create the nodes, but I'm wondering: what are the benefits of putting a string slice into a config struct?
I am trying to insert a compress string of size ~5Mb after compression as a byte array (Blob). when insert the actual size of the row is 23Mb which pass the database hard limit (16Mb).
I check the size of the array right before I send it to the driver. There could be an increase of size during the drive processing the data?
Thank you.
May 03 17:32:17.022 ERROR main common_lib: Panic: panicked at 'attempt to shift left with overflow', /Users/alph/.cargo/registry/src/github.pie.apple.com-9396ac0411171c83/cdrs-tokio-6.2.0/src/retry/reconnection_policy.rs:119:22
thread 'main' panicked at 'attempt to shift left with overflow', /Users/alph/.cargo/registry/src/github.pie.apple.com-9396ac0411171c83/cdrs-tokio-6.2.0/src/retry/reconnection_policy.rs:119:22
The link to SSL session in this section: https://github.com/krojew/cdrs-tokio/tree/master/documentation
Is broken and leads to https://github.com/krojew/cdrs-tokio/blob/master/documentation/cdrs-session.md
is it possible to applied this fix to this crate @krojew AlexPikalov/cdrs#348
After introducing protocol v5, we need extensive end-to-end tests on a v5 cluster. Ideally, also regressions checks with v4.
Such tests include running all types of queries with and without compression (everything from section 4 of https://github.com/apache/cassandra/blob/trunk/doc/native_protocol_v5.spec), listening to events, using high-level Rust-to-UDT (de)serialization, using different authentication methods.
Note: use the latest Cassandra 4.0 and the master branch, since the changes are not published yet.
Lz4-compress needs to be switched to Lz4_flex or lz4-fear.
There's no new
method in the session
module that accepts ClusterRustlsConfig
. Thanks
When I try to run any test, I get this error message:
error[E0432]: unresolved import `winapi::shared::mstcpip`
--> C:\Users\me\.cargo\registry\src\github.com-1ecc6299db9ec823\mio-0.7.10\src\sys\windows\tcp.rs:13:5
|
13 | use winapi::shared::mstcpip;
| ^^^^^^^^^^^^^^^^^^^^^^^ no `mstcpip` in `shared`
I am running Windows 10. I had no problem running https://github.com/AlexPikalov/cdrs. Do you know why this error occurs?
Hello, I was wondering if it was possible to add an example of TLS config? Thank you!
Hi I got the above error in https://github.com/krojew/cdrs-tokio/blob/master/cassandra-protocol/src/token.rs#L32 when trying to run session.exec_with_values
.
Full backtrace is:
The application panicked (crashed).
Message: assertion failed: `(left == right)`
left: `128`,
right: `0`
Location: C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\backtrace-0.3.66\src\dbghelp.rs:297
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ BACKTRACE ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
⋮ 28 frames hidden ⋮
29: core::slice::index::slice_end_index_len_fail_rt<unknown>
at /rustc/897e37553bba8b42751c67658967889d11ecd120/library\core\src\slice\index.rs:76
30: core::slice::index::slice_end_index_len_fail<unknown>
at /rustc/897e37553bba8b42751c67658967889d11ecd120/library\core\src\slice\index.rs:69
31: core::slice::index::impl$3::index<u8><unknown>
at /rustc/897e37553bba8b42751c67658967889d11ecd120\library\core\src\slice\index.rs:316
32: core::slice::index::impl$0::index<u8,core::ops::range::Range<usize> ><unknown>
at /rustc/897e37553bba8b42751c67658967889d11ecd120\library\core\src\slice\index.rs:18
33: cassandra_protocol::token::Murmur3Token::generate<unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\cassandra-protocol-2.0.1\src\token.rs:32
34: core::ops::function::FnOnce::call_once<cassandra_protocol::token::Murmur3Token (*)(slice$<u8>),tuple$<slice$<u8> > ><unknown>
at /rustc/897e37553bba8b42751c67658967889d11ecd120\library\core\src\ops\function.rs:248
35: enum2$<core::option::Option<slice$<u8> > >::map<slice$<u8>,cassandra_protocol::token::Murmur3Token,cassandra_protocol::token::Murmur3Token (*)(slice$<u8>)><unknown>
at /rustc/897e37553bba8b42751c67658967889d11ecd120\library\core\src\option.rs:929
36: cdrs_tokio::load_balancing::topology_aware::impl$1::replicas_for_request::closure$0<cdrs_tokio::transport::TransportTcp,cdrs_tokio::cluster::tcp_connection_manager::TcpConnectionManager><unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\cdrs-tokio-7.0.2\src\load_balancing\topology_aware.rs:77
37: enum2$<core::option::Option<cassandra_protocol::token::Murmur3Token> >::or_else<cassandra_protocol::token::Murmur3Token,cdrs_tokio::load_balancing::topology_aware::impl$1::replicas_for_request::closure_env$0<cdrs_tokio::transport::TransportTcp,cdrs_tokio:<unknown>
at /rustc/897e37553bba8b42751c67658967889d11ecd120\library\core\src\option.rs:1377
38: cdrs_tokio::load_balancing::topology_aware::TopologyAwareLoadBalancingStrategy<cdrs_tokio::transport::TransportTcp,cdrs_tokio::cluster::tcp_connection_manager::TcpConnectionManager>::replicas_for_request<cdrs_tokio::transport::TransportTcp,cdrs_tokio::clu<unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\cdrs-tokio-7.0.2\src\load_balancing\topology_aware.rs:75
39: cdrs_tokio::load_balancing::topology_aware::impl$0::query_plan<cdrs_tokio::transport::TransportTcp,cdrs_tokio::cluster::tcp_connection_manager::TcpConnectionManager><unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\cdrs-tokio-7.0.2\src\load_balancing\topology_aware.rs:45
40: cdrs_tokio::load_balancing::initializing_wrapper::impl$0::query_plan<cdrs_tokio::transport::TransportTcp,cdrs_tokio::cluster::tcp_connection_manager::TcpConnectionManager,cdrs_tokio::load_balancing::topology_aware::TopologyAwareLoadBalancingStrategy<cdrs_<unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\cdrs-tokio-7.0.2\src\load_balancing\initializing_wrapper.rs:27
41: cdrs_tokio::cluster::session::Session<cdrs_tokio::transport::TransportTcp,cdrs_tokio::cluster::tcp_connection_manager::TcpConnectionManager,cdrs_tokio::load_balancing::topology_aware::TopologyAwareLoadBalancingStrategy<cdrs_tokio::transport::TransportTcp,<unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\cdrs-tokio-7.0.2\src\cluster\session.rs:574
42: cdrs_tokio::cluster::session::impl$1::send_envelope::async_fn$0<cdrs_tokio::transport::TransportTcp,cdrs_tokio::cluster::tcp_connection_manager::TcpConnectionManager,cdrs_tokio::load_balancing::topology_aware::TopologyAwareLoadBalancingStrategy<cdrs_tokio<unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\cdrs-tokio-7.0.2\src\cluster\session.rs:610
43: core::future::from_generator::impl$1::poll<enum2$<cdrs_tokio::cluster::session::impl$1::send_envelope::async_fn_env$0<cdrs_tokio::transport::TransportTcp,cdrs_tokio::cluster::tcp_connection_manager::TcpConnectionManager,cdrs_tokio::load_balancing::topolog<unknown>
at /rustc/897e37553bba8b42751c67658967889d11ecd120\library\core\src\future\mod.rs:91
44: cdrs_tokio::cluster::session::impl$1::exec_with_params::async_fn$0<cdrs_tokio::transport::TransportTcp,cdrs_tokio::cluster::tcp_connection_manager::TcpConnectionManager,cdrs_tokio::load_balancing::topology_aware::TopologyAwareLoadBalancingStrategy<cdrs_to<unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\cdrs-tokio-7.0.2\src\cluster\session.rs:276
45: core::future::from_generator::impl$1::poll<enum2$<cdrs_tokio::cluster::session::impl$1::exec_with_params::async_fn_env$0<cdrs_tokio::transport::TransportTcp,cdrs_tokio::cluster::tcp_connection_manager::TcpConnectionManager,cdrs_tokio::load_balancing::topo<unknown>
at /rustc/897e37553bba8b42751c67658967889d11ecd120\library\core\src\future\mod.rs:91
46: cdrs_tokio::cluster::session::impl$1::exec_with_values::async_fn$0<cdrs_tokio::transport::TransportTcp,cdrs_tokio::cluster::tcp_connection_manager::TcpConnectionManager,cdrs_tokio::load_balancing::topology_aware::TopologyAwareLoadBalancingStrategy<cdrs_to<unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\cdrs-tokio-7.0.2\src\cluster\session.rs:385
47: core::future::from_generator::impl$1::poll<enum2$<cdrs_tokio::cluster::session::impl$1::exec_with_values::async_fn_env$0<cdrs_tokio::transport::TransportTcp,cdrs_tokio::cluster::tcp_connection_manager::TcpConnectionManager,cdrs_tokio::load_balancing::topo<unknown>
at /rustc/897e37553bba8b42751c67658967889d11ecd120\library\core\src\future\mod.rs:91
48: server_stats_v2::database::impl$1::get_keys::async_fn$0<unknown>
at C:\Users\marce\IdeaProjects\server_stats_v2\src\database.rs:196
49: core::future::from_generator::impl$1::poll<enum2$<server_stats_v2::database::impl$1::get_keys::async_fn_env$0> ><unknown>
at /rustc/897e37553bba8b42751c67658967889d11ecd120\library\core\src\future\mod.rs:91
50: server_stats_v2::server::key::async_fn$0<unknown>
at C:\Users\marce\IdeaProjects\server_stats_v2\src\server.rs:93
51: core::future::from_generator::impl$1::poll<enum2$<server_stats_v2::server::key::async_fn_env$0> ><unknown>
at /rustc/897e37553bba8b42751c67658967889d11ecd120\library\core\src\future\mod.rs:91
52: axum::handler::impl$6::call::async_block$0<core::future::from_generator::GenFuture<enum2$<server_stats_v2::server::key::async_fn_env$0> > (*)(axum::extract::path::Path<alloc::string::String>,axum::extension::Extension<alloc::sync::Arc<server_stats_v2::dat<unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\axum-0.5.17\src\handler\mod.rs:250
53: core::future::from_generator::impl$1::poll<enum2$<axum::handler::impl$6::call::async_block_env$0<core::future::from_generator::GenFuture<enum2$<server_stats_v2::server::key::async_fn_env$0> > (*)(axum::extract::path::Path<alloc::string::String>,axum::exte<unknown>
at /rustc/897e37553bba8b42751c67658967889d11ecd120\library\core\src\future\mod.rs:91
54: core::future::future::impl$1::poll<alloc::boxed::Box<dyn$<core::future::future::Future<assoc$<Output,http::response::Response<http_body::combinators::box_body::UnsyncBoxBody<bytes::bytes::Bytes,axum_core::error::Error> > > >,core::marker::Send>,alloc::all<unknown>
at /rustc/897e37553bba8b42751c67658967889d11ecd120\library\core\src\future\future.rs:124
55: futures_util::future::future::map::impl$2::poll<core::pin::Pin<alloc::boxed::Box<dyn$<core::future::future::Future<assoc$<Output,http::response::Response<http_body::combinators::box_body::UnsyncBoxBody<bytes::bytes::Bytes,axum_core::error::Error> > > >,co<unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\futures-util-0.3.25\src\future\future\map.rs:55
56: futures_util::future::future::impl$15::poll<core::pin::Pin<alloc::boxed::Box<dyn$<core::future::future::Future<assoc$<Output,http::response::Response<http_body::combinators::box_body::UnsyncBoxBody<bytes::bytes::Bytes,axum_core::error::Error> > > >,core::<unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\futures-util-0.3.25\src\lib.rs:91
57: axum::handler::future::impl$4::poll<core::pin::Pin<alloc::boxed::Box<dyn$<core::future::future::Future<assoc$<Output,http::response::Response<http_body::combinators::box_body::UnsyncBoxBody<bytes::bytes::Bytes,axum_core::error::Error> > > >,core::marker::<unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\axum-0.5.17\src\macros.rs:42
58: core::future::future::impl$1::poll<alloc::boxed::Box<dyn$<core::future::future::Future<assoc$<Output,enum2$<core::result::Result<http::response::Response<http_body::combinators::box_body::UnsyncBoxBody<bytes::bytes::Bytes,axum_core::error::Error> >,enum2$<unknown>
at /rustc/897e37553bba8b42751c67658967889d11ecd120\library\core\src\future\future.rs:124
59: tower::util::oneshot::impl$3::poll<tower::util::boxed_clone::BoxCloneService<http::request::Request<hyper::body::body::Body>,http::response::Response<http_body::combinators::box_body::UnsyncBoxBody<bytes::bytes::Bytes,axum_core::error::Error> >,enum2$<cor<unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tower-0.4.13\src\util\oneshot.rs:97
60: axum::routing::route::impl$5::poll<hyper::body::body::Body,enum2$<core::convert::Infallible> ><unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\axum-0.5.17\src\routing\route.rs:150
61: tower_http::map_response_body::impl$6::poll<axum::routing::route::RouteFuture<hyper::body::body::Body,enum2$<core::convert::Infallible> >,http_body::combinators::box_body::UnsyncBoxBody<bytes::bytes::Bytes,axum_core::error::Error> (*)(http_body::combinato<unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tower-http-0.3.4\src\map_response_body.rs:204
62: tower_http::map_response_body::impl$6::poll<tower_http::map_response_body::ResponseFuture<axum::routing::route::RouteFuture<hyper::body::body::Body,enum2$<core::convert::Infallible> >,http_body::combinators::box_body::UnsyncBoxBody<bytes::bytes::Bytes,axu<unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tower-http-0.3.4\src\map_response_body.rs:204
63: core::future::future::impl$1::poll<alloc::boxed::Box<dyn$<core::future::future::Future<assoc$<Output,enum2$<core::result::Result<http::response::Response<http_body::combinators::box_body::UnsyncBoxBody<bytes::bytes::Bytes,axum_core::error::Error> >,enum2$<unknown>
at /rustc/897e37553bba8b42751c67658967889d11ecd120\library\core\src\future\future.rs:124
64: tower::util::oneshot::impl$3::poll<tower::util::boxed_clone::BoxCloneService<http::request::Request<hyper::body::body::Body>,http::response::Response<http_body::combinators::box_body::UnsyncBoxBody<bytes::bytes::Bytes,axum_core::error::Error> >,enum2$<cor<unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tower-0.4.13\src\util\oneshot.rs:97
65: axum::routing::route::impl$5::poll<hyper::body::body::Body,enum2$<core::convert::Infallible> ><unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\axum-0.5.17\src\routing\route.rs:150
66: hyper::proto::h1::dispatch::impl$6::poll_msg<axum::routing::Router<hyper::body::body::Body>,http_body::combinators::box_body::UnsyncBoxBody<bytes::bytes::Bytes,axum_core::error::Error> ><unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\hyper-0.14.22\src\proto\h1\dispatch.rs:491
67: hyper::proto::h1::dispatch::Dispatcher<hyper::proto::h1::dispatch::Server<axum::routing::Router<hyper::body::body::Body>,hyper::body::body::Body>,http_body::combinators::box_body::UnsyncBoxBody<bytes::bytes::Bytes,axum_core::error::Error>,hyper::server::t<unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\hyper-0.14.22\src\proto\h1\dispatch.rs:297
68: hyper::proto::h1::dispatch::Dispatcher<hyper::proto::h1::dispatch::Server<axum::routing::Router<hyper::body::body::Body>,hyper::body::body::Body>,http_body::combinators::box_body::UnsyncBoxBody<bytes::bytes::Bytes,axum_core::error::Error>,hyper::server::t<unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\hyper-0.14.22\src\proto\h1\dispatch.rs:161
69: hyper::proto::h1::dispatch::Dispatcher<hyper::proto::h1::dispatch::Server<axum::routing::Router<hyper::body::body::Body>,hyper::body::body::Body>,http_body::combinators::box_body::UnsyncBoxBody<bytes::bytes::Bytes,axum_core::error::Error>,hyper::server::t<unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\hyper-0.14.22\src\proto\h1\dispatch.rs:137
70: hyper::proto::h1::dispatch::Dispatcher<hyper::proto::h1::dispatch::Server<axum::routing::Router<hyper::body::body::Body>,hyper::body::body::Body>,http_body::combinators::box_body::UnsyncBoxBody<bytes::bytes::Bytes,axum_core::error::Error>,hyper::server::t<unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\hyper-0.14.22\src\proto\h1\dispatch.rs:120
71: hyper::proto::h1::dispatch::impl$1::poll<hyper::proto::h1::dispatch::Server<axum::routing::Router<hyper::body::body::Body>,hyper::body::body::Body>,http_body::combinators::box_body::UnsyncBoxBody<bytes::bytes::Bytes,axum_core::error::Error>,hyper::server:<unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\hyper-0.14.22\src\proto\h1\dispatch.rs:424
72: hyper::server::conn::impl$6::poll<hyper::server::tcp::addr_stream::AddrStream,http_body::combinators::box_body::UnsyncBoxBody<bytes::bytes::Bytes,axum_core::error::Error>,axum::routing::Router<hyper::body::body::Body>,enum2$<hyper::common::exec::Exec> ><unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\hyper-0.14.22\src\server\conn.rs:952
73: hyper::server::conn::upgrades::impl$1::poll<hyper::server::tcp::addr_stream::AddrStream,http_body::combinators::box_body::UnsyncBoxBody<bytes::bytes::Bytes,axum_core::error::Error>,axum::routing::Router<hyper::body::body::Body>,enum2$<hyper::common::exec:<unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\hyper-0.14.22\src\server\conn.rs:1012
74: hyper::server::server::new_svc::impl$1::poll<hyper::server::tcp::addr_stream::AddrStream,axum::routing::into_make_service::IntoMakeServiceFuture<axum::routing::Router<hyper::body::body::Body> >,axum::routing::Router<hyper::body::body::Body>,enum2$<core::c<unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\hyper-0.14.22\src\server\server.rs:741
75: tokio::runtime::task::core::impl$3::poll::closure$0<hyper::server::server::new_svc::NewSvcTask<hyper::server::tcp::addr_stream::AddrStream,axum::routing::into_make_service::IntoMakeServiceFuture<axum::routing::Router<hyper::body::body::Body> >,axum::routi<unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\runtime\task\core.rs:184
76: tokio::loom::std::unsafe_cell::UnsafeCell<enum2$<tokio::runtime::task::core::Stage<hyper::server::server::new_svc::NewSvcTask<hyper::server::tcp::addr_stream::AddrStream,axum::routing::into_make_service::IntoMakeServiceFuture<axum::routing::Router<hyper::<unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\loom\std\unsafe_cell.rs:14
77: tokio::runtime::task::core::CoreStage<hyper::server::server::new_svc::NewSvcTask<hyper::server::tcp::addr_stream::AddrStream,axum::routing::into_make_service::IntoMakeServiceFuture<axum::routing::Router<hyper::body::body::Body> >,axum::routing::Router<hyp<unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\runtime\task\core.rs:174
78: tokio::runtime::task::harness::poll_future::closure$0<hyper::server::server::new_svc::NewSvcTask<hyper::server::tcp::addr_stream::AddrStream,axum::routing::into_make_service::IntoMakeServiceFuture<axum::routing::Router<hyper::body::body::Body> >,axum::rou<unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\runtime\task\harness.rs:480
79: core::panic::unwind_safe::impl$23::call_once<enum2$<core::task::poll::Poll<tuple$<> > >,tokio::runtime::task::harness::poll_future::closure_env$0<hyper::server::server::new_svc::NewSvcTask<hyper::server::tcp::addr_stream::AddrStream,axum::routing::into_ma<unknown>
at /rustc/897e37553bba8b42751c67658967889d11ecd120\library\core\src\panic\unwind_safe.rs:271
80: std::panicking::try::do_call<core::panic::unwind_safe::AssertUnwindSafe<tokio::runtime::task::harness::poll_future::closure_env$0<hyper::server::server::new_svc::NewSvcTask<hyper::server::tcp::addr_stream::AddrStream,axum::routing::into_make_service::Into<unknown>
at /rustc/897e37553bba8b42751c67658967889d11ecd120\library\std\src\panicking.rs:492
81: std::panicking::try::do_catch<core::panic::unwind_safe::AssertUnwindSafe<tokio::runtime::task::harness::impl$1::complete::closure_env$0<core::future::from_generator::GenFuture<enum2$<cdrs_tokio::cluster::control_connection::impl$0::process_events::async_b<unknown>
at <unknown source file>:<unknown line>
82: std::panicking::try<enum2$<core::task::poll::Poll<tuple$<> > >,core::panic::unwind_safe::AssertUnwindSafe<tokio::runtime::task::harness::poll_future::closure_env$0<hyper::server::server::new_svc::NewSvcTask<hyper::server::tcp::addr_stream::AddrStream,axum<unknown>
at /rustc/897e37553bba8b42751c67658967889d11ecd120\library\std\src\panicking.rs:456
83: std::panic::catch_unwind<core::panic::unwind_safe::AssertUnwindSafe<tokio::runtime::task::harness::poll_future::closure_env$0<hyper::server::server::new_svc::NewSvcTask<hyper::server::tcp::addr_stream::AddrStream,axum::routing::into_make_service::IntoMake<unknown>
at /rustc/897e37553bba8b42751c67658967889d11ecd120\library\std\src\panic.rs:137
84: tokio::runtime::task::harness::poll_future<hyper::server::server::new_svc::NewSvcTask<hyper::server::tcp::addr_stream::AddrStream,axum::routing::into_make_service::IntoMakeServiceFuture<axum::routing::Router<hyper::body::body::Body> >,axum::routing::Route<unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\runtime\task\harness.rs:468
85: tokio::runtime::task::harness::Harness<hyper::server::server::new_svc::NewSvcTask<hyper::server::tcp::addr_stream::AddrStream,axum::routing::into_make_service::IntoMakeServiceFuture<axum::routing::Router<hyper::body::body::Body> >,axum::routing::Router<hy<unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\runtime\task\harness.rs:104
86: tokio::runtime::task::harness::Harness<hyper::server::server::new_svc::NewSvcTask<hyper::server::tcp::addr_stream::AddrStream,axum::routing::into_make_service::IntoMakeServiceFuture<axum::routing::Router<hyper::body::body::Body> >,axum::routing::Router<hy<unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\runtime\task\harness.rs:57
87: tokio::runtime::task::raw::poll<hyper::server::server::new_svc::NewSvcTask<hyper::server::tcp::addr_stream::AddrStream,axum::routing::into_make_service::IntoMakeServiceFuture<axum::routing::Router<hyper::body::body::Body> >,axum::routing::Router<hyper::bo<unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\runtime\task\raw.rs:194
88: tokio::runtime::task::raw::RawTask::poll<unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\runtime\task\raw.rs:134
89: tokio::runtime::task::LocalNotified<alloc::sync::Arc<tokio::runtime::scheduler::multi_thread::worker::Shared> >::run<alloc::sync::Arc<tokio::runtime::scheduler::multi_thread::worker::Shared> ><unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\runtime\task\mod.rs:385
90: tokio::runtime::scheduler::multi_thread::worker::impl$1::run_task::closure$0<unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\runtime\scheduler\multi_thread\worker.rs:444
91: tokio::coop::with_budget::closure$0<enum2$<core::result::Result<alloc::boxed::Box<tokio::runtime::scheduler::multi_thread::worker::Core,alloc::alloc::Global>,tuple$<> > >,tokio::runtime::scheduler::multi_thread::worker::impl$1::run_task::closure_env$0><unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\coop.rs:102
92: std::thread::local::LocalKey<core::cell::Cell<tokio::coop::Budget> >::try_with<core::cell::Cell<tokio::coop::Budget>,tokio::coop::with_budget::closure_env$0<enum2$<core::result::Result<alloc::boxed::Box<tokio::runtime::scheduler::multi_thread::worker::Cor<unknown>
at /rustc/897e37553bba8b42751c67658967889d11ecd120\library\std\src\thread\local.rs:445
93: std::thread::local::LocalKey<core::cell::Cell<tokio::coop::Budget> >::with<core::cell::Cell<tokio::coop::Budget>,tokio::coop::with_budget::closure_env$0<enum2$<core::result::Result<alloc::boxed::Box<tokio::runtime::scheduler::multi_thread::worker::Core,al<unknown>
at /rustc/897e37553bba8b42751c67658967889d11ecd120\library\std\src\thread\local.rs:421
94: tokio::coop::with_budget<unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\coop.rs:95
95: tokio::coop::budget<unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\coop.rs:72
96: tokio::runtime::scheduler::multi_thread::worker::Context::run_task<unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\runtime\scheduler\multi_thread\worker.rs:420
97: tokio::runtime::scheduler::multi_thread::worker::Context::run<unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\runtime\scheduler\multi_thread\worker.rs:387
98: tokio::runtime::scheduler::multi_thread::worker::run::closure$0<unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\runtime\scheduler\multi_thread\worker.rs:372
99: tokio::macros::scoped_tls::ScopedKey<tokio::runtime::scheduler::multi_thread::worker::Context>::set<tokio::runtime::scheduler::multi_thread::worker::Context,tokio::runtime::scheduler::multi_thread::worker::run::closure_env$0,tuple$<> ><unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\macros\scoped_tls.rs:61
100: tokio::runtime::scheduler::multi_thread::worker::run<unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\runtime\scheduler\multi_thread\worker.rs:369
101: tokio::runtime::scheduler::multi_thread::worker::impl$0::launch::closure$0<unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\runtime\scheduler\multi_thread\worker.rs:348
102: tokio::runtime::blocking::task::impl$2::poll<tokio::runtime::scheduler::multi_thread::worker::impl$0::launch::closure_env$0,tuple$<> ><unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\runtime\blocking\task.rs:42
103: tokio::runtime::task::core::impl$3::poll::closure$0<tokio::runtime::blocking::task::BlockingTask<tokio::runtime::scheduler::multi_thread::worker::impl$0::launch::closure_env$0> ><unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\runtime\task\core.rs:184
104: tokio::loom::std::unsafe_cell::UnsafeCell<enum2$<tokio::runtime::task::core::Stage<tokio::runtime::blocking::task::BlockingTask<tokio::runtime::scheduler::multi_thread::worker::impl$0::launch::closure_env$0> > > >::with_mut<enum2$<tokio::runtime::task::co<unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\loom\std\unsafe_cell.rs:14
105: tokio::runtime::task::core::CoreStage<tokio::runtime::blocking::task::BlockingTask<tokio::runtime::scheduler::multi_thread::worker::impl$0::launch::closure_env$0> >::poll<tokio::runtime::blocking::task::BlockingTask<tokio::runtime::scheduler::multi_thread<unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\runtime\task\core.rs:174
106: tokio::runtime::task::harness::poll_future::closure$0<tokio::runtime::blocking::task::BlockingTask<tokio::runtime::scheduler::multi_thread::worker::impl$0::launch::closure_env$0>,tokio::runtime::blocking::schedule::NoopSchedule><unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\runtime\task\harness.rs:480
107: core::panic::unwind_safe::impl$23::call_once<enum2$<core::task::poll::Poll<tuple$<> > >,tokio::runtime::task::harness::poll_future::closure_env$0<tokio::runtime::blocking::task::BlockingTask<tokio::runtime::scheduler::multi_thread::worker::impl$0::launch:<unknown>
at /rustc/897e37553bba8b42751c67658967889d11ecd120\library\core\src\panic\unwind_safe.rs:271
108: std::panicking::try::do_call<core::panic::unwind_safe::AssertUnwindSafe<tokio::runtime::task::harness::poll_future::closure_env$0<tokio::runtime::blocking::task::BlockingTask<tokio::runtime::scheduler::multi_thread::worker::impl$0::launch::closure_env$0>,<unknown>
at /rustc/897e37553bba8b42751c67658967889d11ecd120\library\std\src\panicking.rs:492
109: std::panicking::try::do_catch<core::panic::unwind_safe::AssertUnwindSafe<tokio::runtime::task::harness::poll_future::closure_env$1<tokio::runtime::blocking::task::BlockingTask<tokio::net::addr::impl$17::to_socket_addrs::closure_env$0>,tokio::runtime::bloc<unknown>
at <unknown source file>:<unknown line>
110: std::panicking::try<enum2$<core::task::poll::Poll<tuple$<> > >,core::panic::unwind_safe::AssertUnwindSafe<tokio::runtime::task::harness::poll_future::closure_env$0<tokio::runtime::blocking::task::BlockingTask<tokio::runtime::scheduler::multi_thread::worke<unknown>
at /rustc/897e37553bba8b42751c67658967889d11ecd120\library\std\src\panicking.rs:456
111: std::panic::catch_unwind<core::panic::unwind_safe::AssertUnwindSafe<tokio::runtime::task::harness::poll_future::closure_env$0<tokio::runtime::blocking::task::BlockingTask<tokio::runtime::scheduler::multi_thread::worker::impl$0::launch::closure_env$0>,toki<unknown>
at /rustc/897e37553bba8b42751c67658967889d11ecd120\library\std\src\panic.rs:137
112: tokio::runtime::task::harness::poll_future<tokio::runtime::blocking::task::BlockingTask<tokio::runtime::scheduler::multi_thread::worker::impl$0::launch::closure_env$0>,tokio::runtime::blocking::schedule::NoopSchedule><unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\runtime\task\harness.rs:468
113: tokio::runtime::task::harness::Harness<tokio::runtime::blocking::task::BlockingTask<tokio::runtime::scheduler::multi_thread::worker::impl$0::launch::closure_env$0>,tokio::runtime::blocking::schedule::NoopSchedule>::poll_inner<tokio::runtime::blocking::tas<unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\runtime\task\harness.rs:104
114: tokio::runtime::task::harness::Harness<tokio::runtime::blocking::task::BlockingTask<tokio::runtime::scheduler::multi_thread::worker::impl$0::launch::closure_env$0>,tokio::runtime::blocking::schedule::NoopSchedule>::poll<tokio::runtime::blocking::task::Blo<unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\runtime\task\harness.rs:57
115: tokio::runtime::task::raw::poll<tokio::runtime::blocking::task::BlockingTask<tokio::runtime::scheduler::multi_thread::worker::impl$0::launch::closure_env$0>,tokio::runtime::blocking::schedule::NoopSchedule><unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\runtime\task\raw.rs:194
116: tokio::runtime::task::raw::RawTask::poll<unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\runtime\task\raw.rs:134
117: tokio::runtime::task::UnownedTask<tokio::runtime::blocking::schedule::NoopSchedule>::run<tokio::runtime::blocking::schedule::NoopSchedule><unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\runtime\task\mod.rs:422
118: tokio::runtime::blocking::pool::Task::run<unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\runtime\blocking\pool.rs:111
119: tokio::runtime::blocking::pool::Inner::run<unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\runtime\blocking\pool.rs:346
120: tokio::runtime::blocking::pool::impl$5::spawn_thread::closure$0<unknown>
at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\runtime\blocking\pool.rs:321
⋮ 15 frames hidden ⋮
Run with COLORBT_SHOW_HIDDEN=1 environment variable to disable frame filtering.
Run with RUST_BACKTRACE=full to include source snippets.
thread panicked while panicking. aborting.
error: process didn't exit successfully: `target\debug\server_stats_v2.exe` (exit code: 0xc0000409, STATUS_STACK_BUFFER_OVERRUN)
We have a node which is physically up (the machine is on), but not accepting connections.
We're seeing connection time outs, but the retry policy nor the reconnection policy seem to trigger.
When we tested it, it goes through this line:
In such a case, shouldn't it move to the next node and mark this one as temporary down?
Add speculative execution policies: https://docs.datastax.com/en/developer/java-driver/4.13/manual/core/speculative_execution/
Something like this in CI would be useful to add for code quality:
- name: Format check
run: cargo fmt --verbose --all -- --check
- name: Clippy check
run: cargo clippy --verbose --examples --tests -- -D warnings
I think I've found a bug in transport.rs. Had some trouble finding a fix, perhaps @krojew will fare better since you wrote the code. Here's a repo with everything needed: https://github.com/conorbros/cdrs-bug-report
In the logs you can see that the "receiving on an empty channel" error occurs frequently and then the driver sends some garbage bytes that aren't a valid frame: [0, 0, 2, 6a, 36, c4, d3, 7e, 77, 44]
. I think Cassandra itself is ignoring this but I noticed it because it was causing problems in Shotover.
2022-07-12T04:43:03.166569Z DEBUG cdrs_tokio::cluster::control_connection: Establishing new control connection...
2022-07-12T04:43:03.166615Z DEBUG cdrs_tokio::cluster::topology::node: Establishing new connection to node...
2022-07-12T04:43:03.166828Z WARN cdrs_tokio::transport: receiving on an empty channel
2022-07-12T04:43:03.166856Z WARN cdrs_tokio::transport: receiving on an empty channel
2022-07-12T04:43:03.166861Z DEBUG cdrs_tokio::transport: writing frame: [0, 0, 2, 6a, 36, c4, d3, 7e, 77, 44]
2022-07-12T04:43:03.166886Z DEBUG cdrs_tokio::transport: writing frame: [0, 0, 2, 6a, 36, c4, d3, 7e, 77, 44]
2022-07-12T04:43:03.167801Z WARN cdrs_tokio::transport: receiving on an empty channel
2022-07-12T04:43:03.167797Z WARN cdrs_tokio::transport: receiving on an empty channel
2022-07-12T04:43:03.167828Z DEBUG cdrs_tokio::transport: writing frame: [21, 0, 2, 21, 62, 8d, 5, 0, 0, 2, f, 0, 0, 0, 18, 0, 0, 0, 14, 0, 63, 61, 73, 73, 61, 6e, 64, 72, 61, 0, 63, 61, 73, 73, 61, 6e, 64, 72, 61, c5, 9c, 7a, 7e]
2022-07-12T04:43:03.167823Z DEBUG cdrs_tokio::transport: writing frame: [21, 0, 2, 21, 62, 8d, 5, 0, 0, 2, f, 0, 0, 0, 18, 0, 0, 0, 14, 0, 63, 61, 73, 73, 61, 6e, 64, 72, 61, 0, 63, 61, 73, 73, 61, 6e, 64, 72, 61, c5, 9c, 7a, 7e]
2022-07-12T04:43:03.234758Z DEBUG cdrs_tokio::cluster::control_connection: Established new control connection.
2022-07-12T04:43:03.234789Z WARN cdrs_tokio::transport: receiving on an empty channel
2022-07-12T04:43:03.234814Z DEBUG cdrs_tokio::transport: writing frame: [30, 0, 2, 95, 3f, 3e, 5, 0, 0, 3, 7, 0, 0, 0, 27, 0, 0, 0, 1d, 53, 45, 4c, 45, 43, 54, 20, 2a, 20, 46, 52, 4f, 4d, 20, 73, 79, 73, 74, 65, 6d, 2e, 70, 65, 65, 72, 73, 5f, 76, 32, 0, 1, 0, 0, 0, 0, d1, af, 84, 6b]
2022-07-12T04:43:03.234880Z WARN cdrs_tokio::transport: receiving on an empty channel
2022-07-12T04:43:03.234904Z DEBUG cdrs_tokio::transport: writing frame: [95, 0, 2, f9, 75, c5, 5, 0, 0, 3, 7, 0, 0, 0, 24, 0, 0, 0, 1a, 53, 45, 4c, 45, 43, 54, 20, 2a, 20, 46, 52, 4f, 4d, 20, 73, 79, 73, 74, 65, 6d, 2e, 6c, 6f, 63, 61, 6c, 0, 1, 0, 0, 0, 0, 5, 0, 0, 4, 7, 0, 0, 0, 5f, 0, 0, 0, 55, 53, 45, 4c, 45, 43, 54, 20, 6b, 65, 79, 73, 70, 61, 63, 65, 5f, 6e, 61, 6d, 65, 2c, 20, 74, 6f, 4a, 73, 6f, 6e, 28, 72, 65, 70, 6c, 69, 63, 61, 74, 69, 6f, 6e, 29, 20, 41, 53, 20, 72, 65, 70, 6c, 69, 63, 61, 74, 69, 6f, 6e, 20, 46, 52, 4f, 4d, 20, 73, 79, 73, 74, 65, 6d, 5f, 73, 63, 68, 65, 6d, 61, 2e, 6b, 65, 79, 73, 70, 61, 63, 65, 73, 0, 1, 0, 0, 0, 0, b3, 6d, 53, 19]
2022-07-12T04:43:03.236852Z WARN cdrs_tokio::transport: receiving on an empty channel
2022-07-12T04:43:03.236886Z DEBUG cdrs_tokio::transport: writing frame: [30, 0, 2, 95, 3f, 3e, 5, 0, 0, 5, 7, 0, 0, 0, 27, 0, 0, 0, 1d, 53, 45, 4c, 45, 43, 54, 20, 2a, 20, 46, 52, 4f, 4d, 20, 73, 79, 73, 74, 65, 6d, 2e, 70, 65, 65, 72, 73, 5f, 76, 32, 0, 1, 0, 0, 0, 0, fb, 52, 12, 14]
2022-07-12T04:43:03.237775Z DEBUG cdrs_tokio::cluster::metadata_builder: Copying contact point. node_info=NodeInfo { host_id: 491c454d-f62e-433e-95f0-e864c89bf2c3, broadcast_rpc_address: 127.0.0.1:9043, broadcast_address: Some(192.168.160.2:7000), datacenter: "datacenter1", rack: "rack1" }
2022-07-12T04:43:03.237985Z WARN cdrs_tokio::transport: receiving on an empty channel
2022-07-12T04:43:03.238005Z DEBUG cdrs_tokio::transport: writing frame: [3a, 0, 2, 73, 61, 83, 5, 0, 0, 6, b, 0, 0, 0, 31, 0, 3, 0, d, 53, 43, 48, 45, 4d, 41, 5f, 43, 48, 41, 4e, 47, 45, 0, d, 53, 54, 41, 54, 55, 53, 5f, 43, 48, 41, 4e, 47, 45, 0, f, 54, 4f, 50, 4f, 4c, 4f, 47, 59, 5f, 43, 48, 41, 4e, 47, 45, d2, a7, 16, 43]
result : Envelope { version: V5, direction: Response, flags: (empty), opcode: Result, stream_id: 3, tracing_id: None, warnings: [] }
I have setup a cassandra cluster of 6 pods using the helm charts and I am connecting to that cluster with one known node. The application works fine, but occasionally the cpu usage of my application would go full and the application becomes unresponsive. I sampled the cpu usage at one of the peak cpu usage for 1 second and found that 99% of cpu is being used in cdrs_tokio::transport::AsyncTransport::start_processing
, I have attached the complete flamegraph SVG for your reference here. I also observe that after few hours of full cpu usage, the usage does comes down to 50% (but still high)
The flamegraph was captured without debug symbols since It happens occasionally, below is the code that connect demonstrate how I make the connection, here CASSANDRA_HOST is the name of the service of K8 application.
pub async fn create_cassandra_session() -> anyhow::Result<CassandraSession> {
let url = format!(
"{}:{}",
std::env::var("CASSANDRA_HOST").unwrap_or("127.0.0.1".to_string()),
std::env::var("CASSANDRA_PORT").unwrap_or("9042".to_string())
);
tracing::info!("Connecting to cassandra at {url}");
let cluster_config = if let (Some(username), Some(password)) = (
std::env::var("CASSANDRA_USERNAME").ok(),
std::env::var("CASSANDRA_PASSWORD").ok(),
) {
tracing::info!("Cassandra static credentials read from environment");
let authenticator = StaticPasswordAuthenticatorProvider::new(username, password);
NodeTcpConfigBuilder::new()
.with_contact_point(url.into())
.with_authenticator_provider(Arc::new(authenticator))
.build()
.await?
} else {
tracing::info!("Cassandra is connecting without credentials");
NodeTcpConfigBuilder::new()
.with_contact_point(url.into())
.build()
.await?
};
Ok(TcpSessionBuilder::new(RoundRobinLoadBalancingStrategy::new(), cluster_config).build()?)
}
Hello 👋
Is there any way to get tracing informations when making cassandra queries with this crate? I would have hope to get various spans showing the cassandra connections and query executions but I don't find any way to get it. Am I missing something or is this feature not available as of now ?
Looking at the cassandra protocol spec and our current usage of frame.stream it makes a lot of sense for stream_id to be in Frame.
What was the reasoning behind moving it out of Frame?
Instead of additional connection with an iterator, provide a broadcast channel.
When a SessionBuilder (e.g. TcpSessionBuilder) build() method is called, a Session is returned.
However there are still background tasks that need to complete before the session can actually be used.
Two examples I have hit are:
Cannot find node 127.0.0.1:9042 for statement re-preparation!
I think a reasonable solution to this would look like adding a tokio::sync::oneshot that gets sent when the session tasks have finished initializing and then the build()
method can await on it.
This can be worked around locally by adding sleeps, but that is clearly not ideal.
We have now upstreamed all our changes and would appreciate a 6.0.0-beta.4 release.
There is still a lot more cassandra-protocol improvements to be made before a proper 6.0.0 release though.
Hello :-)
I tried to use TcpSessionBuilder
with .with_keyspace
, but received high CPU and network usage.
When i started digging i found that start_reading
and start_writing
methods are in infinite loop.
It looks like because in start_reading
method cassandra always return response with SetKeyspace
result:
[cdrs-tokio-6.2.0/cdrs-tokio/src/transport.rs:320] &body = Result(
SetKeyspace(
BodyResResultSetKeyspace {
body: "my_keyspace",
},
),
)
When i removed .with_keyspace
everything worked good.
Hi team,
I'd like to raise this question again. It has been asked a while ago AlexPikalov/cdrs#282 but I think it deserves better handling.
The question is basically about allowing container fields like Vec<T>
in the structs that make use of TryFromUdt
or TryFromRow
. Currently, if a table contains an empty list, try_from_udt
(try_from_row
) will fail with the error above.
The proposed solution used to be to wrap all the containers in Option
, but it is quite inconvenient and looks like an overkill.
An empty list is a valid construct and I do not see a point in requiring that wrapping from the application-level logic.
In our fork of the repo, we have a fix, that allows empty collections. I'd like to check if there are any objections to upstream it?
Thanks,
-alph
This test below fails, but I think it should work fine. The original CDRS has exactly the same problem.
mod test {
use cdrs_tokio::cluster::{NodeTcpConfigBuilder, ClusterTcpConfig, session};
use cdrs_tokio::load_balancing::RoundRobin;
use cdrs_tokio::query::QueryExecutor;
use cdrs_tokio::authenticators::NoneAuthenticator;
use std::sync::Arc;
#[tokio::test]
async fn goo() {
let node = NodeTcpConfigBuilder::new("127.0.0.1:9042", NoneAuthenticator {})
.build();
let cluster_config = ClusterTcpConfig(vec![node]);
let session = session::new(&cluster_config, RoundRobin::new()).await.unwrap();
// Set the keyspace
session.query("use benjamin").await.unwrap();
// This works!
session.query("select * from conversation").await.unwrap().get_body().unwrap().into_rows().unwrap();
let session = Arc::new(session);
let f = (0..=10)
.into_iter()
.map(|_| {
let session = Arc::clone(&session);
tokio::spawn(async move {
// Scylla complaining about not having a keyspace
session.query("select * from conversation").await.unwrap().get_body().unwrap().into_rows().unwrap();
})
})
.collect::<Vec<_>>();
for result in futures::future::join_all(f).await {
assert!(result.is_ok());
}
}
}
Error:
assertion failed: result.is_ok()
thread 'test::goo' panicked at 'called `Result::unwrap()` on an `Err` value: Server(CDRSError { error_code: 8704, message: CString { string: "No keyspace has been specified. USE a keyspace, or explicitly specify keyspace.tablename" }, additional_info: Invalid(SimpleError) })', src\main.rs:27:71
stack backtrace:
0: std::panicking::begin_panic_handler
at /rustc/f4eb5d9f719cd3c849befc8914ad8ce0ddcf34ed\/library\std\src\panicking.rs:493
1: core::panicking::panic_fmt
at /rustc/f4eb5d9f719cd3c849befc8914ad8ce0ddcf34ed\/library\core\src\panicking.rs:92
2: core::option::expect_none_failed
at /rustc/f4eb5d9f719cd3c849befc8914ad8ce0ddcf34ed\/library\core\src\option.rs:1266
3: core::result::Result<cdrs_tokio::frame::Frame, cdrs_tokio::error::Error>::unwrap<cdrs_tokio::frame::Frame,cdrs_tokio::error::Error>
at C:\Users\javisser\.rustup\toolchains\nightly-x86_64-pc-windows-msvc\lib\rustlib\src\rust\library\core\src\result.rs:969
4: fortesting::test::goo::{{closure}}::{{closure}}::{{closure}}
at .\src\main.rs:27
5: core::future::from_generator::{{impl}}::poll<generator-0>
at C:\Users\javisser\.rustup\toolchains\nightly-x86_64-pc-windows-msvc\lib\rustlib\src\rust\library\core\src\future\mod.rs:80
6: tokio::runtime::task::core::{{impl}}::poll::{{closure}}<core::future::from_generator::GenFuture<generator-0>>
at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\task\core.rs:235
7: tokio::loom::std::unsafe_cell::UnsafeCell<tokio::runtime::task::core::Stage<core::future::from_generator::GenFuture<generator-0>>>::with_mut<tokio::runtime::task::core::Stage<core::future::from_generator::GenFuture<generator-0>>,core::task::poll::Poll<tup
at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\loom\std\unsafe_cell.rs:14
8: tokio::runtime::task::core::CoreStage<core::future::from_generator::GenFuture<generator-0>>::poll<core::future::from_generator::GenFuture<generator-0>>
at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\task\core.rs:225
9: tokio::runtime::task::harness::poll_future::{{closure}}<core::future::from_generator::GenFuture<generator-0>>
at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\task\harness.rs:422
10: std::panic::{{impl}}::call_once<core::task::poll::Poll<tuple<>>,closure-0>
at C:\Users\javisser\.rustup\toolchains\nightly-x86_64-pc-windows-msvc\lib\rustlib\src\rust\library\std\src\panic.rs:322
11: std::panicking::try::do_call<std::panic::AssertUnwindSafe<closure-0>,core::task::poll::Poll<tuple<>>>
at C:\Users\javisser\.rustup\toolchains\nightly-x86_64-pc-windows-msvc\lib\rustlib\src\rust\library\std\src\panicking.rs:379
12: std::panicking::try::do_catch<std::panic::AssertUnwindSafe<closure-0>,tuple<>>
13: std::panicking::try<core::task::poll::Poll<tuple<>>,std::panic::AssertUnwindSafe<closure-0>>
at C:\Users\javisser\.rustup\toolchains\nightly-x86_64-pc-windows-msvc\lib\rustlib\src\rust\library\std\src\panicking.rs:343
14: std::panic::catch_unwind<std::panic::AssertUnwindSafe<closure-0>,core::task::poll::Poll<tuple<>>>
at C:\Users\javisser\.rustup\toolchains\nightly-x86_64-pc-windows-msvc\lib\rustlib\src\rust\library\std\src\panic.rs:396
15: tokio::runtime::task::harness::poll_future<core::future::from_generator::GenFuture<generator-0>>
at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\task\harness.rs:409
16: tokio::runtime::task::harness::Harness<core::future::from_generator::GenFuture<generator-0>, alloc::sync::Arc<tokio::runtime::basic_scheduler::Shared>>::poll_inner<core::future::from_generator::GenFuture<generator-0>,alloc::sync::Arc<tokio::runtime::basic
at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\task\harness.rs:89
17: tokio::runtime::task::harness::Harness<core::future::from_generator::GenFuture<generator-0>, alloc::sync::Arc<tokio::runtime::basic_scheduler::Shared>>::poll<core::future::from_generator::GenFuture<generator-0>,alloc::sync::Arc<tokio::runtime::basic_sched
at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\task\harness.rs:59
18: tokio::runtime::task::raw::poll<core::future::from_generator::GenFuture<generator-0>,alloc::sync::Arc<tokio::runtime::basic_scheduler::Shared>>
at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\task\raw.rs:104
19: tokio::runtime::task::raw::RawTask::poll
at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\task\raw.rs:66
20: tokio::runtime::task::Notified<alloc::sync::Arc<tokio::runtime::basic_scheduler::Shared>>::run<alloc::sync::Arc<tokio::runtime::basic_scheduler::Shared>>
at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\task\mod.rs:171
21: tokio::runtime::basic_scheduler::{{impl}}::block_on::{{closure}}::{{closure}}<tokio::runtime::driver::Driver,core::pin::Pin<mut core::future::from_generator::GenFuture<generator-0>*>>
at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\basic_scheduler.rs:208
22: tokio::coop::with_budget::{{closure}}<tuple<>,closure-3>
at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\coop.rs:121
23: std::thread::local::LocalKey<core::cell::Cell<tokio::coop::Budget>>::try_with<core::cell::Cell<tokio::coop::Budget>,closure-0,tuple<>>
at C:\Users\javisser\.rustup\toolchains\nightly-x86_64-pc-windows-msvc\lib\rustlib\src\rust\library\std\src\thread\local.rs:272
24: std::thread::local::LocalKey<core::cell::Cell<tokio::coop::Budget>>::with<core::cell::Cell<tokio::coop::Budget>,closure-0,tuple<>>
at C:\Users\javisser\.rustup\toolchains\nightly-x86_64-pc-windows-msvc\lib\rustlib\src\rust\library\std\src\thread\local.rs:248
25: tokio::coop::with_budget
at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\coop.rs:114
26: tokio::coop::budget
at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\coop.rs:98
27: tokio::runtime::basic_scheduler::{{impl}}::block_on::{{closure}}<tokio::runtime::driver::Driver,core::pin::Pin<mut core::future::from_generator::GenFuture<generator-0>*>>
at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\basic_scheduler.rs:208
28: tokio::runtime::basic_scheduler::enter::{{closure}}<closure-0,tuple<>,tokio::runtime::driver::Driver>
at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\basic_scheduler.rs:266
29: tokio::macros::scoped_tls::ScopedKey<tokio::runtime::basic_scheduler::Context>::set<tokio::runtime::basic_scheduler::Context,closure-0,tuple<>>
at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\macros\scoped_tls.rs:61
30: tokio::runtime::basic_scheduler::enter<closure-0,tuple<>,tokio::runtime::driver::Driver>
at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\basic_scheduler.rs:266
31: tokio::runtime::basic_scheduler::Inner<tokio::runtime::driver::Driver>::block_on<tokio::runtime::driver::Driver,core::pin::Pin<mut core::future::from_generator::GenFuture<generator-0>*>>
at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\basic_scheduler.rs:176
32: tokio::runtime::basic_scheduler::InnerGuard<tokio::runtime::driver::Driver>::block_on<tokio::runtime::driver::Driver,core::pin::Pin<mut core::future::from_generator::GenFuture<generator-0>*>>
at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\basic_scheduler.rs:405
33: tokio::runtime::basic_scheduler::BasicScheduler<tokio::runtime::driver::Driver>::block_on<tokio::runtime::driver::Driver,core::future::from_generator::GenFuture<generator-0>>
at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\basic_scheduler.rs:136
34: tokio::runtime::Runtime::block_on<core::future::from_generator::GenFuture<generator-0>>
at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\mod.rs:450
35: fortesting::test::goo
at .\src\main.rs:8
36: fortesting::test::goo::{{closure}}
at .\src\main.rs:8
37: core::ops::function::FnOnce::call_once<closure-0,tuple<>>
at C:\Users\javisser\.rustup\toolchains\nightly-x86_64-pc-windows-msvc\lib\rustlib\src\rust\library\core\src\ops\function.rs:227
38: core::ops::function::FnOnce::call_once
at /rustc/f4eb5d9f719cd3c849befc8914ad8ce0ddcf34ed\library\core\src\ops\function.rs:227
Hello. I am not sure if this is a bug or user error. When I am running my test suite against a database, I have noticed that at least a couple of the tests fairly consistently fail with an error: "Connection closed while waiting for response". I have tried to run the tests in single threaded mode, I have tried adding connections to the connection pool, but nothing seems to work. The number of failures vary, the tests which fail vary, etc. Since it also happens when running with "--jobs 1", I suspect it is not a threading issue.
At the same time, I see cassandra logs that say things like "Unknown exception in client networking" and "Connection reset by peer". I am running against a single node locally running cassandra test cluster.
I guess my question is whether this seems to be a bug or whether I am supposed to be handling intermittent connection failures like this in my client code.
Hi there,
I've switched to using cdrs-tokio from cdrs few days ago and started noticing some app lock ups. That usually happens when a large frame is being sent. Similar to issue fixed in #4 but on a larger threshold. It now happens when sending frames > ~0x20000 bytes.
Sorry, cannot provide a test case. I'll try to track it down.
Cheers,
-alph
I was wondering if there is any way of using paging as described here: https://docs.datastax.com/en/developer/java-driver/4.14/manual/core/paging/ ?
I am trying to have a select query, which returns many rows, combined to about 60Mb. the performance is slow (~0.8 sec). I think this is a paging issue.
let response = session
.query_with_params(
SELECT_CQL,
StatementParamsBuilder::new()
.with_values(query_values!(id, month))
.build(),
)
.await?
.response_body()?
.into_rows();
Hey,
I was trying to write some logic to issue a paged query using a prepared statement, however the type that exec
returns - ExecPager
isn't exported from cdrs_tokio::cluster
. The struct is marked as public, however it doesn't appear to offer any interface to convert it into a type that I do have access to. My primary reason for wanting access to the type is so that I can return it as the result of a helper function. https://docs.rs/cdrs-tokio/3.0.0/src/cdrs_tokio/cluster/pager.rs.html#174
let mut pager = self.conn.paged(5);
let query = pager.exec(&prepared_query);
Am I just misunderstanding the API? or is this a quick fix?
I am getting error UnexpectedWriteType("CAS"))
when doing batch inserts from multiple tasks in the same Session
object.
It fails intermittently on a three node cluster with Cassandra version [cqlsh 6.0.0 | Cassandra 4.0.5 | CQL spec 3.4.5 | Native protocol v5]
. It works consistently with a single task.
On a local Cassandra instance in docker, it work consistently with multiple tasks - version 4.0.6.
It seems this projects fork predates cassandra-proto
I am in need of a cassandra protocol crate that can support response encoding and query decoding for the purposes of implementing a proxy.
Would that be out of scope of cdrs-tokio's protocol functionality?
Would you be interested in pulling cdrs-tokio's protocol functionality into a cassandra-proto like crate? (I am happy to put in the work to make this happen)
I am currently looking at getting cassandra-proto cleaned up and improved at https://github.com/shotover/cassandra-proto
But this crate is fairly active so it would be a shame to duplicate work.
I would be happy to collaborate in any way that makes sense for your project: possibly a subcrate in this repo or possibly another separate repo.
Consider the following code:
let user = "cassandra";
let password = "cassandra";
let auth = StaticPasswordAuthenticatorProvider::new(&user, &password);
let config = NodeTcpConfigBuilder::new()
.with_contact_point("127.0.0.1:9042".into())
.with_authenticator_provider(Arc::new(auth))
.build()
.await
.unwrap();
let session = TcpSessionBuilder::new(RoundRobinLoadBalancingStrategy::new(), config)
.build()
.unwrap();
let mut event_recv = session.create_event_receiver();
sleep(Duration::from_secs(3)).await; // let the driver finish connecting to the cluster and registering for the events
let create_ks = "CREATE KEYSPACE IF NOT EXISTS test_events_ks WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };";
session.query(create_ks).await.unwrap();
let event = timeout(Duration::from_secs(10), event_recv.recv())
.await
.unwrap()
.unwrap();
Before bc021f4 event_recv.recv()
would complete, with that commit it now times out
Migrate all try_from_bytes
in types
to X::from_be_bytes()
.
Since we don't have async traits and the Session is generic, we need to provide a wrapper over an arbitrary inner LB.
for aws keyspaces, the authentication challenge includes a nonce
key that is required for signing the auth token (when using aws credentials for cassandra/keyspaces access)
I think the most straightforward way to do this is to pass the auth challenge response body into Authenticator.auth_token()
-- you can see from the aws sigv4 java plugin that this is how the datastax driver works for it's auth plugins
Authenticator
trait requires Self:Sized
, because it is Clone
, so it is not possible to have Box<dyn Authenticator>
Authenticator
impl type is tied to the Session
type.This means you can't abstract over authenticator in runtime, and e.g. initialize a Session from authentication config known only at runtime and then let all the other code use a single type of a Session.
We should add non exhaustive to enums which are likely to change upstream.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.