Giter Club home page Giter Club logo

cdrs-tokio's Introduction

CDRS tokio

crates.io version build status

CDRS tokio - async Apache Cassandra driver using tokio

CDRS is production-ready Apache Cassandra driver written in pure Ru* *s**t. Focuses on providing high level of configurability to suit most use cases at any scale, as its Java counterpart, while also leveraging the safety and performance of Rust.

Features

  • Asynchronous API;
  • TCP/TLS connection (rustls);
  • Topology-aware dynamic and configurable load balancing;
  • Configurable connection strategies and pools;
  • Configurable speculative execution;
  • LZ4, Snappy compression;
  • Cassandra-to-Rust data serialization/deserialization with custom type support;
  • Pluggable authentication strategies;
  • ScyllaDB support;
  • Server events listening;
  • Multiple CQL version support (3, 4, 5), full spec implementation;
  • Query tracing information;
  • Prepared statements;
  • Query paging;
  • Batch statements;
  • Configurable retry and reconnection policy;
  • Support for interleaved queries;
  • Support for Yugabyte YCQL JSONB;
  • Support for beta protocol usage;

Performance

Due to high configurability of CDRS, the performance will vary depending on use case. The following benchmarks have been made against the latest (master as of 03-12-2021) versions of respective libraries (except cassandra-cpp: 2.16.0) and protocol version 4.

  • cdrs-tokio-large-pool - CDRS with node connection pool equal to double of physical CPU cores
  • cdrs-tokio-small-pool - CDRS with a single connection per node
  • scylladb-rust-large-pool - scylla crate with node connection pool equal to double of physical CPU cores
  • scylladb-rust-small-pool - scylla crate with a single connection per node
  • cassandra-cpp - Rust bindings for Datastax C++ Driver, running on multiple threads using Tokio
  • gocql - a driver written in Go

insert benchmark

select benchmark

mixed benchmark

Knowing given use case, CDRS can be optimized for peak performance.

Documentation and examples

Getting started

This example configures a cluster consisting of a single node without authentication, and uses round-robin load balancing. Other options are kept as default.

use cdrs_tokio::cluster::session::{TcpSessionBuilder, SessionBuilder};
use cdrs_tokio::cluster::NodeTcpConfigBuilder;
use cdrs_tokio::load_balancing::RoundRobinLoadBalancingStrategy;
use cdrs_tokio::query::*;

#[tokio::main]
async fn main() {
    let cluster_config = NodeTcpConfigBuilder::new()
        .with_contact_point("127.0.0.1:9042".into())
        .build()
        .await
        .unwrap();
    let session = TcpSessionBuilder::new(RoundRobinLoadBalancingStrategy::new(), cluster_config)
        .build()
        .await
        .unwrap();

    let create_ks = "CREATE KEYSPACE IF NOT EXISTS test_ks WITH REPLICATION = { \
                     'class' : 'SimpleStrategy', 'replication_factor' : 1 };";
    session
        .query(create_ks)
        .await
        .expect("Keyspace create error");
}

License

This project is licensed under either of

at your option.

cdrs-tokio's People

Contributors

aerc18 avatar alexpikalov avatar apezel avatar athre0z avatar byron avatar cibingeorge avatar cnd avatar conorbros avatar dependabot[bot] avatar dfaust avatar doumanash avatar dowlandaiello avatar dylan-dpc avatar ernestas-poskus avatar grossws avatar guy9 avatar harrydevnull avatar jasperav1994 avatar jerrypnz avatar jumbojets avatar krojew avatar matansegal avatar nkconnor avatar npatsakula avatar rrichardson avatar rukai avatar stormbrew avatar sunng87 avatar teburd avatar wagenet avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

cdrs-tokio's Issues

increase in size for a byte array

I am trying to insert a compress string of size ~5Mb after compression as a byte array (Blob). when insert the actual size of the row is 23Mb which pass the database hard limit (16Mb).

I check the size of the array right before I send it to the driver. There could be an increase of size during the drive processing the data?

Thank you.

Release 6.0.0-beta.4

We have now upstreamed all our changes and would appreciate a 6.0.0-beta.4 release.
There is still a lot more cassandra-protocol improvements to be made before a proper 6.0.0 release though.

Protocol v5 tests

After introducing protocol v5, we need extensive end-to-end tests on a v5 cluster. Ideally, also regressions checks with v4.

Such tests include running all types of queries with and without compression (everything from section 4 of https://github.com/apache/cassandra/blob/trunk/doc/native_protocol_v5.spec), listening to events, using high-level Rust-to-UDT (de)serialization, using different authentication methods.

Note: use the latest Cassandra 4.0 and the master branch, since the changes are not published yet.

no `mstcpip` in `shared`

When I try to run any test, I get this error message:

error[E0432]: unresolved import `winapi::shared::mstcpip`
  --> C:\Users\me\.cargo\registry\src\github.com-1ecc6299db9ec823\mio-0.7.10\src\sys\windows\tcp.rs:13:5
   |
13 | use winapi::shared::mstcpip;
   |     ^^^^^^^^^^^^^^^^^^^^^^^ no `mstcpip` in `shared`

I am running Windows 10. I had no problem running https://github.com/AlexPikalov/cdrs. Do you know why this error occurs?

Why is stream_id moved out of Frame?

Looking at the cassandra protocol spec and our current usage of frame.stream it makes a lot of sense for stream_id to be in Frame.
What was the reasoning behind moving it out of Frame?

Issue when using parallel tokio tasks with a shared arc

This test below fails, but I think it should work fine. The original CDRS has exactly the same problem.

mod test {
    use cdrs_tokio::cluster::{NodeTcpConfigBuilder, ClusterTcpConfig, session};
    use cdrs_tokio::load_balancing::RoundRobin;
    use cdrs_tokio::query::QueryExecutor;
    use cdrs_tokio::authenticators::NoneAuthenticator;
    use std::sync::Arc;

    #[tokio::test]
    async fn goo() {
        let node = NodeTcpConfigBuilder::new("127.0.0.1:9042", NoneAuthenticator {})
            .build();
        let cluster_config = ClusterTcpConfig(vec![node]);
        let session = session::new(&cluster_config, RoundRobin::new()).await.unwrap();

        // Set the keyspace
        session.query("use benjamin").await.unwrap();

        // This works!
        session.query("select * from conversation").await.unwrap().get_body().unwrap().into_rows().unwrap();

        let session = Arc::new(session);

        let f = (0..=10)
            .into_iter()
            .map(|_| {
                let session = Arc::clone(&session);

                tokio::spawn(async move {
                    // Scylla complaining about not having a keyspace
                    session.query("select * from conversation").await.unwrap().get_body().unwrap().into_rows().unwrap();
                })
            })
            .collect::<Vec<_>>();

        for result in futures::future::join_all(f).await {
            assert!(result.is_ok());
        }
    }
}

Error:

assertion failed: result.is_ok()
thread 'test::goo' panicked at 'called `Result::unwrap()` on an `Err` value: Server(CDRSError { error_code: 8704, message: CString { string: "No keyspace has been specified. USE a keyspace, or explicitly specify keyspace.tablename" }, additional_info: Invalid(SimpleError) })', src\main.rs:27:71
stack backtrace:
   0: std::panicking::begin_panic_handler
             at /rustc/f4eb5d9f719cd3c849befc8914ad8ce0ddcf34ed\/library\std\src\panicking.rs:493
   1: core::panicking::panic_fmt
             at /rustc/f4eb5d9f719cd3c849befc8914ad8ce0ddcf34ed\/library\core\src\panicking.rs:92
   2: core::option::expect_none_failed
             at /rustc/f4eb5d9f719cd3c849befc8914ad8ce0ddcf34ed\/library\core\src\option.rs:1266
   3: core::result::Result<cdrs_tokio::frame::Frame, cdrs_tokio::error::Error>::unwrap<cdrs_tokio::frame::Frame,cdrs_tokio::error::Error>
             at C:\Users\javisser\.rustup\toolchains\nightly-x86_64-pc-windows-msvc\lib\rustlib\src\rust\library\core\src\result.rs:969
   4: fortesting::test::goo::{{closure}}::{{closure}}::{{closure}}
             at .\src\main.rs:27
   5: core::future::from_generator::{{impl}}::poll<generator-0>
             at C:\Users\javisser\.rustup\toolchains\nightly-x86_64-pc-windows-msvc\lib\rustlib\src\rust\library\core\src\future\mod.rs:80
   6: tokio::runtime::task::core::{{impl}}::poll::{{closure}}<core::future::from_generator::GenFuture<generator-0>>
             at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\task\core.rs:235
   7: tokio::loom::std::unsafe_cell::UnsafeCell<tokio::runtime::task::core::Stage<core::future::from_generator::GenFuture<generator-0>>>::with_mut<tokio::runtime::task::core::Stage<core::future::from_generator::GenFuture<generator-0>>,core::task::poll::Poll<tup
             at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\loom\std\unsafe_cell.rs:14
   8: tokio::runtime::task::core::CoreStage<core::future::from_generator::GenFuture<generator-0>>::poll<core::future::from_generator::GenFuture<generator-0>>
             at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\task\core.rs:225
   9: tokio::runtime::task::harness::poll_future::{{closure}}<core::future::from_generator::GenFuture<generator-0>>
             at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\task\harness.rs:422
  10: std::panic::{{impl}}::call_once<core::task::poll::Poll<tuple<>>,closure-0>
             at C:\Users\javisser\.rustup\toolchains\nightly-x86_64-pc-windows-msvc\lib\rustlib\src\rust\library\std\src\panic.rs:322
  11: std::panicking::try::do_call<std::panic::AssertUnwindSafe<closure-0>,core::task::poll::Poll<tuple<>>>
             at C:\Users\javisser\.rustup\toolchains\nightly-x86_64-pc-windows-msvc\lib\rustlib\src\rust\library\std\src\panicking.rs:379
  12: std::panicking::try::do_catch<std::panic::AssertUnwindSafe<closure-0>,tuple<>>
  13: std::panicking::try<core::task::poll::Poll<tuple<>>,std::panic::AssertUnwindSafe<closure-0>>
             at C:\Users\javisser\.rustup\toolchains\nightly-x86_64-pc-windows-msvc\lib\rustlib\src\rust\library\std\src\panicking.rs:343
  14: std::panic::catch_unwind<std::panic::AssertUnwindSafe<closure-0>,core::task::poll::Poll<tuple<>>>
             at C:\Users\javisser\.rustup\toolchains\nightly-x86_64-pc-windows-msvc\lib\rustlib\src\rust\library\std\src\panic.rs:396
  15: tokio::runtime::task::harness::poll_future<core::future::from_generator::GenFuture<generator-0>>
             at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\task\harness.rs:409
  16: tokio::runtime::task::harness::Harness<core::future::from_generator::GenFuture<generator-0>, alloc::sync::Arc<tokio::runtime::basic_scheduler::Shared>>::poll_inner<core::future::from_generator::GenFuture<generator-0>,alloc::sync::Arc<tokio::runtime::basic
             at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\task\harness.rs:89
  17: tokio::runtime::task::harness::Harness<core::future::from_generator::GenFuture<generator-0>, alloc::sync::Arc<tokio::runtime::basic_scheduler::Shared>>::poll<core::future::from_generator::GenFuture<generator-0>,alloc::sync::Arc<tokio::runtime::basic_sched
             at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\task\harness.rs:59
  18: tokio::runtime::task::raw::poll<core::future::from_generator::GenFuture<generator-0>,alloc::sync::Arc<tokio::runtime::basic_scheduler::Shared>>
             at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\task\raw.rs:104
  19: tokio::runtime::task::raw::RawTask::poll
             at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\task\raw.rs:66
  20: tokio::runtime::task::Notified<alloc::sync::Arc<tokio::runtime::basic_scheduler::Shared>>::run<alloc::sync::Arc<tokio::runtime::basic_scheduler::Shared>>
             at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\task\mod.rs:171
  21: tokio::runtime::basic_scheduler::{{impl}}::block_on::{{closure}}::{{closure}}<tokio::runtime::driver::Driver,core::pin::Pin<mut core::future::from_generator::GenFuture<generator-0>*>>
             at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\basic_scheduler.rs:208
  22: tokio::coop::with_budget::{{closure}}<tuple<>,closure-3>
             at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\coop.rs:121
  23: std::thread::local::LocalKey<core::cell::Cell<tokio::coop::Budget>>::try_with<core::cell::Cell<tokio::coop::Budget>,closure-0,tuple<>>
             at C:\Users\javisser\.rustup\toolchains\nightly-x86_64-pc-windows-msvc\lib\rustlib\src\rust\library\std\src\thread\local.rs:272
  24: std::thread::local::LocalKey<core::cell::Cell<tokio::coop::Budget>>::with<core::cell::Cell<tokio::coop::Budget>,closure-0,tuple<>>
             at C:\Users\javisser\.rustup\toolchains\nightly-x86_64-pc-windows-msvc\lib\rustlib\src\rust\library\std\src\thread\local.rs:248
  25: tokio::coop::with_budget
             at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\coop.rs:114
  26: tokio::coop::budget
             at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\coop.rs:98
  27: tokio::runtime::basic_scheduler::{{impl}}::block_on::{{closure}}<tokio::runtime::driver::Driver,core::pin::Pin<mut core::future::from_generator::GenFuture<generator-0>*>>
             at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\basic_scheduler.rs:208
  28: tokio::runtime::basic_scheduler::enter::{{closure}}<closure-0,tuple<>,tokio::runtime::driver::Driver>
             at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\basic_scheduler.rs:266
  29: tokio::macros::scoped_tls::ScopedKey<tokio::runtime::basic_scheduler::Context>::set<tokio::runtime::basic_scheduler::Context,closure-0,tuple<>>
             at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\macros\scoped_tls.rs:61
  30: tokio::runtime::basic_scheduler::enter<closure-0,tuple<>,tokio::runtime::driver::Driver>
             at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\basic_scheduler.rs:266
  31: tokio::runtime::basic_scheduler::Inner<tokio::runtime::driver::Driver>::block_on<tokio::runtime::driver::Driver,core::pin::Pin<mut core::future::from_generator::GenFuture<generator-0>*>>
             at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\basic_scheduler.rs:176
  32: tokio::runtime::basic_scheduler::InnerGuard<tokio::runtime::driver::Driver>::block_on<tokio::runtime::driver::Driver,core::pin::Pin<mut core::future::from_generator::GenFuture<generator-0>*>>
             at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\basic_scheduler.rs:405
  33: tokio::runtime::basic_scheduler::BasicScheduler<tokio::runtime::driver::Driver>::block_on<tokio::runtime::driver::Driver,core::future::from_generator::GenFuture<generator-0>>
             at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\basic_scheduler.rs:136
  34: tokio::runtime::Runtime::block_on<core::future::from_generator::GenFuture<generator-0>>
             at C:\Users\javisser\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.2.0\src\runtime\mod.rs:450
  35: fortesting::test::goo
             at .\src\main.rs:8
  36: fortesting::test::goo::{{closure}}
             at .\src\main.rs:8
  37: core::ops::function::FnOnce::call_once<closure-0,tuple<>>
             at C:\Users\javisser\.rustup\toolchains\nightly-x86_64-pc-windows-msvc\lib\rustlib\src\rust\library\core\src\ops\function.rs:227
  38: core::ops::function::FnOnce::call_once
             at /rustc/f4eb5d9f719cd3c849befc8914ad8ce0ddcf34ed\library\core\src\ops\function.rs:227

exec_with_params is returning ErrorType::Unprepared.

Since 8.1.0 (it was working fine in 8.0.0), exec_with_params is returning ErrorType::Unprepared in some circumstances.
This should not occur because it is supposed to be handled here:

if let ErrorType::Unprepared(_) = error.ty {

edit:
I think the cause would have to involve the attempted retry also hitting an unprepared query error.
Its possible this is the fault of our proxy but afaik the proxy should send the new prepare to all nodes in the cluster, so the second query failing should not be possible.
But the issue was definitely introduced by something in 8.1.0 ...
I will investigate further.

ExponentialReconnectionPolicy panics after several attempts

May 03 17:32:17.022 ERROR main common_lib: Panic: panicked at 'attempt to shift left with overflow', /Users/alph/.cargo/registry/src/github.pie.apple.com-9396ac0411171c83/cdrs-tokio-6.2.0/src/retry/reconnection_policy.rs:119:22    
thread 'main' panicked at 'attempt to shift left with overflow', /Users/alph/.cargo/registry/src/github.pie.apple.com-9396ac0411171c83/cdrs-tokio-6.2.0/src/retry/reconnection_policy.rs:119:22

Row to serde_json::Value

for row in rows {
        for col in &cols.metadata.col_specs {
            let value = match col.clone().col_type.id {
                ColType::Varchar | ColType::Ascii => {
                    let val: String = row.get_by_name(col.name.as_str()).unwrap().expect("");
                    println!("{:#?}", val)
                }
                ColType::Map => {
                    let map_value: Map = row.get_by_name(col.name.as_str()).unwrap().expect("");
                }
                _ => {}
            };
        }
    }

is anyone have idea how to convert the Map/List to serde_json::Value? assume the struct is unknown.

Bug? ExecPager isn't exported

Hey,

I was trying to write some logic to issue a paged query using a prepared statement, however the type that exec returns - ExecPager isn't exported from cdrs_tokio::cluster. The struct is marked as public, however it doesn't appear to offer any interface to convert it into a type that I do have access to. My primary reason for wanting access to the type is so that I can return it as the result of a helper function. https://docs.rs/cdrs-tokio/3.0.0/src/cdrs_tokio/cluster/pager.rs.html#174

let mut pager = self.conn.paged(5);
let query = pager.exec(&prepared_query);

Am I just misunderstanding the API? or is this a quick fix?

&str in NodeTcpConfig

The NodeTcpConfig struct holds a &str instead of owning a String.
This forbid writing the following trivial code:

let nodes: Vec<NodeTcpConfig<'_>> = service_discovery.services("cassandra").iter()
    .map(|srv| {
        let address = format!("{}:{}", srv.host, srv.port);
        NodeTcpConfigBuilder::new(&address, auth.clone()).build()
        // ^ returns a value referencing data owned by the current function
    })
    .collect();

A workaround is to collect all the addresses and then create the nodes, but I'm wondering: what are the benefits of putting a string slice into a config struct?

provide Authenticators with authentication ResponseBody

for aws keyspaces, the authentication challenge includes a nonce key that is required for signing the auth token (when using aws credentials for cassandra/keyspaces access)

I think the most straightforward way to do this is to pass the auth challenge response body into Authenticator.auth_token() -- you can see from the aws sigv4 java plugin that this is how the datastax driver works for it's auth plugins

schema change event is never received

Consider the following code:

    let user = "cassandra";
    let password = "cassandra";
    let auth = StaticPasswordAuthenticatorProvider::new(&user, &password);
    let config = NodeTcpConfigBuilder::new()
        .with_contact_point("127.0.0.1:9042".into())
        .with_authenticator_provider(Arc::new(auth))
        .build()
        .await
        .unwrap();

    let session = TcpSessionBuilder::new(RoundRobinLoadBalancingStrategy::new(), config)
        .build()
        .unwrap();

    let mut event_recv = session.create_event_receiver();

    sleep(Duration::from_secs(3)).await; // let the driver finish connecting to the cluster and registering for the events

    let create_ks = "CREATE KEYSPACE IF NOT EXISTS test_events_ks WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };";

    session.query(create_ks).await.unwrap();

    let event = timeout(Duration::from_secs(10), event_recv.recv())
        .await
        .unwrap()
        .unwrap();

Before bc021f4 event_recv.recv() would complete, with that commit it now times out

Bug in transport.rs

I think I've found a bug in transport.rs. Had some trouble finding a fix, perhaps @krojew will fare better since you wrote the code. Here's a repo with everything needed: https://github.com/conorbros/cdrs-bug-report

In the logs you can see that the "receiving on an empty channel" error occurs frequently and then the driver sends some garbage bytes that aren't a valid frame: [0, 0, 2, 6a, 36, c4, d3, 7e, 77, 44]. I think Cassandra itself is ignoring this but I noticed it because it was causing problems in Shotover.

2022-07-12T04:43:03.166569Z DEBUG cdrs_tokio::cluster::control_connection: Establishing new control connection...
2022-07-12T04:43:03.166615Z DEBUG cdrs_tokio::cluster::topology::node: Establishing new connection to node...
2022-07-12T04:43:03.166828Z  WARN cdrs_tokio::transport: receiving on an empty channel
2022-07-12T04:43:03.166856Z  WARN cdrs_tokio::transport: receiving on an empty channel
2022-07-12T04:43:03.166861Z DEBUG cdrs_tokio::transport: writing frame: [0, 0, 2, 6a, 36, c4, d3, 7e, 77, 44]
2022-07-12T04:43:03.166886Z DEBUG cdrs_tokio::transport: writing frame: [0, 0, 2, 6a, 36, c4, d3, 7e, 77, 44]
2022-07-12T04:43:03.167801Z  WARN cdrs_tokio::transport: receiving on an empty channel
2022-07-12T04:43:03.167797Z  WARN cdrs_tokio::transport: receiving on an empty channel
2022-07-12T04:43:03.167828Z DEBUG cdrs_tokio::transport: writing frame: [21, 0, 2, 21, 62, 8d, 5, 0, 0, 2, f, 0, 0, 0, 18, 0, 0, 0, 14, 0, 63, 61, 73, 73, 61, 6e, 64, 72, 61, 0, 63, 61, 73, 73, 61, 6e, 64, 72, 61, c5, 9c, 7a, 7e]
2022-07-12T04:43:03.167823Z DEBUG cdrs_tokio::transport: writing frame: [21, 0, 2, 21, 62, 8d, 5, 0, 0, 2, f, 0, 0, 0, 18, 0, 0, 0, 14, 0, 63, 61, 73, 73, 61, 6e, 64, 72, 61, 0, 63, 61, 73, 73, 61, 6e, 64, 72, 61, c5, 9c, 7a, 7e]
2022-07-12T04:43:03.234758Z DEBUG cdrs_tokio::cluster::control_connection: Established new control connection.
2022-07-12T04:43:03.234789Z  WARN cdrs_tokio::transport: receiving on an empty channel
2022-07-12T04:43:03.234814Z DEBUG cdrs_tokio::transport: writing frame: [30, 0, 2, 95, 3f, 3e, 5, 0, 0, 3, 7, 0, 0, 0, 27, 0, 0, 0, 1d, 53, 45, 4c, 45, 43, 54, 20, 2a, 20, 46, 52, 4f, 4d, 20, 73, 79, 73, 74, 65, 6d, 2e, 70, 65, 65, 72, 73, 5f, 76, 32, 0, 1, 0, 0, 0, 0, d1, af, 84, 6b]
2022-07-12T04:43:03.234880Z  WARN cdrs_tokio::transport: receiving on an empty channel
2022-07-12T04:43:03.234904Z DEBUG cdrs_tokio::transport: writing frame: [95, 0, 2, f9, 75, c5, 5, 0, 0, 3, 7, 0, 0, 0, 24, 0, 0, 0, 1a, 53, 45, 4c, 45, 43, 54, 20, 2a, 20, 46, 52, 4f, 4d, 20, 73, 79, 73, 74, 65, 6d, 2e, 6c, 6f, 63, 61, 6c, 0, 1, 0, 0, 0, 0, 5, 0, 0, 4, 7, 0, 0, 0, 5f, 0, 0, 0, 55, 53, 45, 4c, 45, 43, 54, 20, 6b, 65, 79, 73, 70, 61, 63, 65, 5f, 6e, 61, 6d, 65, 2c, 20, 74, 6f, 4a, 73, 6f, 6e, 28, 72, 65, 70, 6c, 69, 63, 61, 74, 69, 6f, 6e, 29, 20, 41, 53, 20, 72, 65, 70, 6c, 69, 63, 61, 74, 69, 6f, 6e, 20, 46, 52, 4f, 4d, 20, 73, 79, 73, 74, 65, 6d, 5f, 73, 63, 68, 65, 6d, 61, 2e, 6b, 65, 79, 73, 70, 61, 63, 65, 73, 0, 1, 0, 0, 0, 0, b3, 6d, 53, 19]
2022-07-12T04:43:03.236852Z  WARN cdrs_tokio::transport: receiving on an empty channel
2022-07-12T04:43:03.236886Z DEBUG cdrs_tokio::transport: writing frame: [30, 0, 2, 95, 3f, 3e, 5, 0, 0, 5, 7, 0, 0, 0, 27, 0, 0, 0, 1d, 53, 45, 4c, 45, 43, 54, 20, 2a, 20, 46, 52, 4f, 4d, 20, 73, 79, 73, 74, 65, 6d, 2e, 70, 65, 65, 72, 73, 5f, 76, 32, 0, 1, 0, 0, 0, 0, fb, 52, 12, 14]
2022-07-12T04:43:03.237775Z DEBUG cdrs_tokio::cluster::metadata_builder: Copying contact point. node_info=NodeInfo { host_id: 491c454d-f62e-433e-95f0-e864c89bf2c3, broadcast_rpc_address: 127.0.0.1:9043, broadcast_address: Some(192.168.160.2:7000), datacenter: "datacenter1", rack: "rack1" }
2022-07-12T04:43:03.237985Z  WARN cdrs_tokio::transport: receiving on an empty channel
2022-07-12T04:43:03.238005Z DEBUG cdrs_tokio::transport: writing frame: [3a, 0, 2, 73, 61, 83, 5, 0, 0, 6, b, 0, 0, 0, 31, 0, 3, 0, d, 53, 43, 48, 45, 4d, 41, 5f, 43, 48, 41, 4e, 47, 45, 0, d, 53, 54, 41, 54, 55, 53, 5f, 43, 48, 41, 4e, 47, 45, 0, f, 54, 4f, 50, 4f, 4c, 4f, 47, 59, 5f, 43, 48, 41, 4e, 47, 45, d2, a7, 16, 43]
result : Envelope { version: V5, direction: Response, flags: (empty), opcode: Result, stream_id: 3, tracing_id: None, warnings: [] }

Upgrade lz4

Lz4-compress needs to be switched to Lz4_flex or lz4-fear.

Add cargo fmt and clippy checks into CI

Something like this in CI would be useful to add for code quality:

- name: Format check
  run: cargo fmt --verbose --all -- --check
- name: Clippy check
  run: cargo clippy --verbose --examples --tests -- -D warnings

Trouble with dropped connections on Session

Hello. I am not sure if this is a bug or user error. When I am running my test suite against a database, I have noticed that at least a couple of the tests fairly consistently fail with an error: "Connection closed while waiting for response". I have tried to run the tests in single threaded mode, I have tried adding connections to the connection pool, but nothing seems to work. The number of failures vary, the tests which fail vary, etc. Since it also happens when running with "--jobs 1", I suspect it is not a threading issue.

At the same time, I see cassandra logs that say things like "Unknown exception in client networking" and "Connection reset by peer". I am running against a single node locally running cassandra test cluster.

I guess my question is whether this seems to be a bug or whether I am supposed to be handling intermittent connection failures like this in my client code.

Range end index 88 out of range for slice of length 83

Hi I got the above error in https://github.com/krojew/cdrs-tokio/blob/master/cassandra-protocol/src/token.rs#L32 when trying to run session.exec_with_values.

Full backtrace is:

The application panicked (crashed).
Message:  assertion failed: `(left == right)`
  left: `128`,
 right: `0`
Location: C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\backtrace-0.3.66\src\dbghelp.rs:297

  โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ” BACKTRACE โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”
                                โ‹ฎ 28 frames hidden โ‹ฎ
  29: core::slice::index::slice_end_index_len_fail_rt<unknown>
      at /rustc/897e37553bba8b42751c67658967889d11ecd120/library\core\src\slice\index.rs:76
  30: core::slice::index::slice_end_index_len_fail<unknown>
      at /rustc/897e37553bba8b42751c67658967889d11ecd120/library\core\src\slice\index.rs:69
  31: core::slice::index::impl$3::index<u8><unknown>
      at /rustc/897e37553bba8b42751c67658967889d11ecd120\library\core\src\slice\index.rs:316
  32: core::slice::index::impl$0::index<u8,core::ops::range::Range<usize> ><unknown>
      at /rustc/897e37553bba8b42751c67658967889d11ecd120\library\core\src\slice\index.rs:18
  33: cassandra_protocol::token::Murmur3Token::generate<unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\cassandra-protocol-2.0.1\src\token.rs:32
  34: core::ops::function::FnOnce::call_once<cassandra_protocol::token::Murmur3Token (*)(slice$<u8>),tuple$<slice$<u8> > ><unknown>
      at /rustc/897e37553bba8b42751c67658967889d11ecd120\library\core\src\ops\function.rs:248
  35: enum2$<core::option::Option<slice$<u8> > >::map<slice$<u8>,cassandra_protocol::token::Murmur3Token,cassandra_protocol::token::Murmur3Token (*)(slice$<u8>)><unknown>
      at /rustc/897e37553bba8b42751c67658967889d11ecd120\library\core\src\option.rs:929
  36: cdrs_tokio::load_balancing::topology_aware::impl$1::replicas_for_request::closure$0<cdrs_tokio::transport::TransportTcp,cdrs_tokio::cluster::tcp_connection_manager::TcpConnectionManager><unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\cdrs-tokio-7.0.2\src\load_balancing\topology_aware.rs:77
  37: enum2$<core::option::Option<cassandra_protocol::token::Murmur3Token> >::or_else<cassandra_protocol::token::Murmur3Token,cdrs_tokio::load_balancing::topology_aware::impl$1::replicas_for_request::closure_env$0<cdrs_tokio::transport::TransportTcp,cdrs_tokio:<unknown>
      at /rustc/897e37553bba8b42751c67658967889d11ecd120\library\core\src\option.rs:1377
  38: cdrs_tokio::load_balancing::topology_aware::TopologyAwareLoadBalancingStrategy<cdrs_tokio::transport::TransportTcp,cdrs_tokio::cluster::tcp_connection_manager::TcpConnectionManager>::replicas_for_request<cdrs_tokio::transport::TransportTcp,cdrs_tokio::clu<unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\cdrs-tokio-7.0.2\src\load_balancing\topology_aware.rs:75
  39: cdrs_tokio::load_balancing::topology_aware::impl$0::query_plan<cdrs_tokio::transport::TransportTcp,cdrs_tokio::cluster::tcp_connection_manager::TcpConnectionManager><unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\cdrs-tokio-7.0.2\src\load_balancing\topology_aware.rs:45
  40: cdrs_tokio::load_balancing::initializing_wrapper::impl$0::query_plan<cdrs_tokio::transport::TransportTcp,cdrs_tokio::cluster::tcp_connection_manager::TcpConnectionManager,cdrs_tokio::load_balancing::topology_aware::TopologyAwareLoadBalancingStrategy<cdrs_<unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\cdrs-tokio-7.0.2\src\load_balancing\initializing_wrapper.rs:27
  41: cdrs_tokio::cluster::session::Session<cdrs_tokio::transport::TransportTcp,cdrs_tokio::cluster::tcp_connection_manager::TcpConnectionManager,cdrs_tokio::load_balancing::topology_aware::TopologyAwareLoadBalancingStrategy<cdrs_tokio::transport::TransportTcp,<unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\cdrs-tokio-7.0.2\src\cluster\session.rs:574
  42: cdrs_tokio::cluster::session::impl$1::send_envelope::async_fn$0<cdrs_tokio::transport::TransportTcp,cdrs_tokio::cluster::tcp_connection_manager::TcpConnectionManager,cdrs_tokio::load_balancing::topology_aware::TopologyAwareLoadBalancingStrategy<cdrs_tokio<unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\cdrs-tokio-7.0.2\src\cluster\session.rs:610
  43: core::future::from_generator::impl$1::poll<enum2$<cdrs_tokio::cluster::session::impl$1::send_envelope::async_fn_env$0<cdrs_tokio::transport::TransportTcp,cdrs_tokio::cluster::tcp_connection_manager::TcpConnectionManager,cdrs_tokio::load_balancing::topolog<unknown>
      at /rustc/897e37553bba8b42751c67658967889d11ecd120\library\core\src\future\mod.rs:91
  44: cdrs_tokio::cluster::session::impl$1::exec_with_params::async_fn$0<cdrs_tokio::transport::TransportTcp,cdrs_tokio::cluster::tcp_connection_manager::TcpConnectionManager,cdrs_tokio::load_balancing::topology_aware::TopologyAwareLoadBalancingStrategy<cdrs_to<unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\cdrs-tokio-7.0.2\src\cluster\session.rs:276
  45: core::future::from_generator::impl$1::poll<enum2$<cdrs_tokio::cluster::session::impl$1::exec_with_params::async_fn_env$0<cdrs_tokio::transport::TransportTcp,cdrs_tokio::cluster::tcp_connection_manager::TcpConnectionManager,cdrs_tokio::load_balancing::topo<unknown>
      at /rustc/897e37553bba8b42751c67658967889d11ecd120\library\core\src\future\mod.rs:91
  46: cdrs_tokio::cluster::session::impl$1::exec_with_values::async_fn$0<cdrs_tokio::transport::TransportTcp,cdrs_tokio::cluster::tcp_connection_manager::TcpConnectionManager,cdrs_tokio::load_balancing::topology_aware::TopologyAwareLoadBalancingStrategy<cdrs_to<unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\cdrs-tokio-7.0.2\src\cluster\session.rs:385
  47: core::future::from_generator::impl$1::poll<enum2$<cdrs_tokio::cluster::session::impl$1::exec_with_values::async_fn_env$0<cdrs_tokio::transport::TransportTcp,cdrs_tokio::cluster::tcp_connection_manager::TcpConnectionManager,cdrs_tokio::load_balancing::topo<unknown>
      at /rustc/897e37553bba8b42751c67658967889d11ecd120\library\core\src\future\mod.rs:91
  48: server_stats_v2::database::impl$1::get_keys::async_fn$0<unknown>
      at C:\Users\marce\IdeaProjects\server_stats_v2\src\database.rs:196
  49: core::future::from_generator::impl$1::poll<enum2$<server_stats_v2::database::impl$1::get_keys::async_fn_env$0> ><unknown>
      at /rustc/897e37553bba8b42751c67658967889d11ecd120\library\core\src\future\mod.rs:91
  50: server_stats_v2::server::key::async_fn$0<unknown>
      at C:\Users\marce\IdeaProjects\server_stats_v2\src\server.rs:93
  51: core::future::from_generator::impl$1::poll<enum2$<server_stats_v2::server::key::async_fn_env$0> ><unknown>
      at /rustc/897e37553bba8b42751c67658967889d11ecd120\library\core\src\future\mod.rs:91
  52: axum::handler::impl$6::call::async_block$0<core::future::from_generator::GenFuture<enum2$<server_stats_v2::server::key::async_fn_env$0> > (*)(axum::extract::path::Path<alloc::string::String>,axum::extension::Extension<alloc::sync::Arc<server_stats_v2::dat<unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\axum-0.5.17\src\handler\mod.rs:250
  53: core::future::from_generator::impl$1::poll<enum2$<axum::handler::impl$6::call::async_block_env$0<core::future::from_generator::GenFuture<enum2$<server_stats_v2::server::key::async_fn_env$0> > (*)(axum::extract::path::Path<alloc::string::String>,axum::exte<unknown>
      at /rustc/897e37553bba8b42751c67658967889d11ecd120\library\core\src\future\mod.rs:91
  54: core::future::future::impl$1::poll<alloc::boxed::Box<dyn$<core::future::future::Future<assoc$<Output,http::response::Response<http_body::combinators::box_body::UnsyncBoxBody<bytes::bytes::Bytes,axum_core::error::Error> > > >,core::marker::Send>,alloc::all<unknown>
      at /rustc/897e37553bba8b42751c67658967889d11ecd120\library\core\src\future\future.rs:124
  55: futures_util::future::future::map::impl$2::poll<core::pin::Pin<alloc::boxed::Box<dyn$<core::future::future::Future<assoc$<Output,http::response::Response<http_body::combinators::box_body::UnsyncBoxBody<bytes::bytes::Bytes,axum_core::error::Error> > > >,co<unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\futures-util-0.3.25\src\future\future\map.rs:55
  56: futures_util::future::future::impl$15::poll<core::pin::Pin<alloc::boxed::Box<dyn$<core::future::future::Future<assoc$<Output,http::response::Response<http_body::combinators::box_body::UnsyncBoxBody<bytes::bytes::Bytes,axum_core::error::Error> > > >,core::<unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\futures-util-0.3.25\src\lib.rs:91
  57: axum::handler::future::impl$4::poll<core::pin::Pin<alloc::boxed::Box<dyn$<core::future::future::Future<assoc$<Output,http::response::Response<http_body::combinators::box_body::UnsyncBoxBody<bytes::bytes::Bytes,axum_core::error::Error> > > >,core::marker::<unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\axum-0.5.17\src\macros.rs:42
  58: core::future::future::impl$1::poll<alloc::boxed::Box<dyn$<core::future::future::Future<assoc$<Output,enum2$<core::result::Result<http::response::Response<http_body::combinators::box_body::UnsyncBoxBody<bytes::bytes::Bytes,axum_core::error::Error> >,enum2$<unknown>
      at /rustc/897e37553bba8b42751c67658967889d11ecd120\library\core\src\future\future.rs:124
  59: tower::util::oneshot::impl$3::poll<tower::util::boxed_clone::BoxCloneService<http::request::Request<hyper::body::body::Body>,http::response::Response<http_body::combinators::box_body::UnsyncBoxBody<bytes::bytes::Bytes,axum_core::error::Error> >,enum2$<cor<unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tower-0.4.13\src\util\oneshot.rs:97
  60: axum::routing::route::impl$5::poll<hyper::body::body::Body,enum2$<core::convert::Infallible> ><unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\axum-0.5.17\src\routing\route.rs:150
  61: tower_http::map_response_body::impl$6::poll<axum::routing::route::RouteFuture<hyper::body::body::Body,enum2$<core::convert::Infallible> >,http_body::combinators::box_body::UnsyncBoxBody<bytes::bytes::Bytes,axum_core::error::Error> (*)(http_body::combinato<unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tower-http-0.3.4\src\map_response_body.rs:204
  62: tower_http::map_response_body::impl$6::poll<tower_http::map_response_body::ResponseFuture<axum::routing::route::RouteFuture<hyper::body::body::Body,enum2$<core::convert::Infallible> >,http_body::combinators::box_body::UnsyncBoxBody<bytes::bytes::Bytes,axu<unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tower-http-0.3.4\src\map_response_body.rs:204
  63: core::future::future::impl$1::poll<alloc::boxed::Box<dyn$<core::future::future::Future<assoc$<Output,enum2$<core::result::Result<http::response::Response<http_body::combinators::box_body::UnsyncBoxBody<bytes::bytes::Bytes,axum_core::error::Error> >,enum2$<unknown>
      at /rustc/897e37553bba8b42751c67658967889d11ecd120\library\core\src\future\future.rs:124
  64: tower::util::oneshot::impl$3::poll<tower::util::boxed_clone::BoxCloneService<http::request::Request<hyper::body::body::Body>,http::response::Response<http_body::combinators::box_body::UnsyncBoxBody<bytes::bytes::Bytes,axum_core::error::Error> >,enum2$<cor<unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tower-0.4.13\src\util\oneshot.rs:97
  65: axum::routing::route::impl$5::poll<hyper::body::body::Body,enum2$<core::convert::Infallible> ><unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\axum-0.5.17\src\routing\route.rs:150
  66: hyper::proto::h1::dispatch::impl$6::poll_msg<axum::routing::Router<hyper::body::body::Body>,http_body::combinators::box_body::UnsyncBoxBody<bytes::bytes::Bytes,axum_core::error::Error> ><unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\hyper-0.14.22\src\proto\h1\dispatch.rs:491
  67: hyper::proto::h1::dispatch::Dispatcher<hyper::proto::h1::dispatch::Server<axum::routing::Router<hyper::body::body::Body>,hyper::body::body::Body>,http_body::combinators::box_body::UnsyncBoxBody<bytes::bytes::Bytes,axum_core::error::Error>,hyper::server::t<unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\hyper-0.14.22\src\proto\h1\dispatch.rs:297
  68: hyper::proto::h1::dispatch::Dispatcher<hyper::proto::h1::dispatch::Server<axum::routing::Router<hyper::body::body::Body>,hyper::body::body::Body>,http_body::combinators::box_body::UnsyncBoxBody<bytes::bytes::Bytes,axum_core::error::Error>,hyper::server::t<unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\hyper-0.14.22\src\proto\h1\dispatch.rs:161
  69: hyper::proto::h1::dispatch::Dispatcher<hyper::proto::h1::dispatch::Server<axum::routing::Router<hyper::body::body::Body>,hyper::body::body::Body>,http_body::combinators::box_body::UnsyncBoxBody<bytes::bytes::Bytes,axum_core::error::Error>,hyper::server::t<unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\hyper-0.14.22\src\proto\h1\dispatch.rs:137
  70: hyper::proto::h1::dispatch::Dispatcher<hyper::proto::h1::dispatch::Server<axum::routing::Router<hyper::body::body::Body>,hyper::body::body::Body>,http_body::combinators::box_body::UnsyncBoxBody<bytes::bytes::Bytes,axum_core::error::Error>,hyper::server::t<unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\hyper-0.14.22\src\proto\h1\dispatch.rs:120
  71: hyper::proto::h1::dispatch::impl$1::poll<hyper::proto::h1::dispatch::Server<axum::routing::Router<hyper::body::body::Body>,hyper::body::body::Body>,http_body::combinators::box_body::UnsyncBoxBody<bytes::bytes::Bytes,axum_core::error::Error>,hyper::server:<unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\hyper-0.14.22\src\proto\h1\dispatch.rs:424
  72: hyper::server::conn::impl$6::poll<hyper::server::tcp::addr_stream::AddrStream,http_body::combinators::box_body::UnsyncBoxBody<bytes::bytes::Bytes,axum_core::error::Error>,axum::routing::Router<hyper::body::body::Body>,enum2$<hyper::common::exec::Exec> ><unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\hyper-0.14.22\src\server\conn.rs:952
  73: hyper::server::conn::upgrades::impl$1::poll<hyper::server::tcp::addr_stream::AddrStream,http_body::combinators::box_body::UnsyncBoxBody<bytes::bytes::Bytes,axum_core::error::Error>,axum::routing::Router<hyper::body::body::Body>,enum2$<hyper::common::exec:<unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\hyper-0.14.22\src\server\conn.rs:1012
  74: hyper::server::server::new_svc::impl$1::poll<hyper::server::tcp::addr_stream::AddrStream,axum::routing::into_make_service::IntoMakeServiceFuture<axum::routing::Router<hyper::body::body::Body> >,axum::routing::Router<hyper::body::body::Body>,enum2$<core::c<unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\hyper-0.14.22\src\server\server.rs:741
  75: tokio::runtime::task::core::impl$3::poll::closure$0<hyper::server::server::new_svc::NewSvcTask<hyper::server::tcp::addr_stream::AddrStream,axum::routing::into_make_service::IntoMakeServiceFuture<axum::routing::Router<hyper::body::body::Body> >,axum::routi<unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\runtime\task\core.rs:184
  76: tokio::loom::std::unsafe_cell::UnsafeCell<enum2$<tokio::runtime::task::core::Stage<hyper::server::server::new_svc::NewSvcTask<hyper::server::tcp::addr_stream::AddrStream,axum::routing::into_make_service::IntoMakeServiceFuture<axum::routing::Router<hyper::<unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\loom\std\unsafe_cell.rs:14
  77: tokio::runtime::task::core::CoreStage<hyper::server::server::new_svc::NewSvcTask<hyper::server::tcp::addr_stream::AddrStream,axum::routing::into_make_service::IntoMakeServiceFuture<axum::routing::Router<hyper::body::body::Body> >,axum::routing::Router<hyp<unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\runtime\task\core.rs:174
  78: tokio::runtime::task::harness::poll_future::closure$0<hyper::server::server::new_svc::NewSvcTask<hyper::server::tcp::addr_stream::AddrStream,axum::routing::into_make_service::IntoMakeServiceFuture<axum::routing::Router<hyper::body::body::Body> >,axum::rou<unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\runtime\task\harness.rs:480
  79: core::panic::unwind_safe::impl$23::call_once<enum2$<core::task::poll::Poll<tuple$<> > >,tokio::runtime::task::harness::poll_future::closure_env$0<hyper::server::server::new_svc::NewSvcTask<hyper::server::tcp::addr_stream::AddrStream,axum::routing::into_ma<unknown>
      at /rustc/897e37553bba8b42751c67658967889d11ecd120\library\core\src\panic\unwind_safe.rs:271
  80: std::panicking::try::do_call<core::panic::unwind_safe::AssertUnwindSafe<tokio::runtime::task::harness::poll_future::closure_env$0<hyper::server::server::new_svc::NewSvcTask<hyper::server::tcp::addr_stream::AddrStream,axum::routing::into_make_service::Into<unknown>
      at /rustc/897e37553bba8b42751c67658967889d11ecd120\library\std\src\panicking.rs:492
  81: std::panicking::try::do_catch<core::panic::unwind_safe::AssertUnwindSafe<tokio::runtime::task::harness::impl$1::complete::closure_env$0<core::future::from_generator::GenFuture<enum2$<cdrs_tokio::cluster::control_connection::impl$0::process_events::async_b<unknown>
      at <unknown source file>:<unknown line>
  82: std::panicking::try<enum2$<core::task::poll::Poll<tuple$<> > >,core::panic::unwind_safe::AssertUnwindSafe<tokio::runtime::task::harness::poll_future::closure_env$0<hyper::server::server::new_svc::NewSvcTask<hyper::server::tcp::addr_stream::AddrStream,axum<unknown>
      at /rustc/897e37553bba8b42751c67658967889d11ecd120\library\std\src\panicking.rs:456
  83: std::panic::catch_unwind<core::panic::unwind_safe::AssertUnwindSafe<tokio::runtime::task::harness::poll_future::closure_env$0<hyper::server::server::new_svc::NewSvcTask<hyper::server::tcp::addr_stream::AddrStream,axum::routing::into_make_service::IntoMake<unknown>
      at /rustc/897e37553bba8b42751c67658967889d11ecd120\library\std\src\panic.rs:137
  84: tokio::runtime::task::harness::poll_future<hyper::server::server::new_svc::NewSvcTask<hyper::server::tcp::addr_stream::AddrStream,axum::routing::into_make_service::IntoMakeServiceFuture<axum::routing::Router<hyper::body::body::Body> >,axum::routing::Route<unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\runtime\task\harness.rs:468
  85: tokio::runtime::task::harness::Harness<hyper::server::server::new_svc::NewSvcTask<hyper::server::tcp::addr_stream::AddrStream,axum::routing::into_make_service::IntoMakeServiceFuture<axum::routing::Router<hyper::body::body::Body> >,axum::routing::Router<hy<unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\runtime\task\harness.rs:104
  86: tokio::runtime::task::harness::Harness<hyper::server::server::new_svc::NewSvcTask<hyper::server::tcp::addr_stream::AddrStream,axum::routing::into_make_service::IntoMakeServiceFuture<axum::routing::Router<hyper::body::body::Body> >,axum::routing::Router<hy<unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\runtime\task\harness.rs:57
  87: tokio::runtime::task::raw::poll<hyper::server::server::new_svc::NewSvcTask<hyper::server::tcp::addr_stream::AddrStream,axum::routing::into_make_service::IntoMakeServiceFuture<axum::routing::Router<hyper::body::body::Body> >,axum::routing::Router<hyper::bo<unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\runtime\task\raw.rs:194
  88: tokio::runtime::task::raw::RawTask::poll<unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\runtime\task\raw.rs:134
  89: tokio::runtime::task::LocalNotified<alloc::sync::Arc<tokio::runtime::scheduler::multi_thread::worker::Shared> >::run<alloc::sync::Arc<tokio::runtime::scheduler::multi_thread::worker::Shared> ><unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\runtime\task\mod.rs:385
  90: tokio::runtime::scheduler::multi_thread::worker::impl$1::run_task::closure$0<unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\runtime\scheduler\multi_thread\worker.rs:444
  91: tokio::coop::with_budget::closure$0<enum2$<core::result::Result<alloc::boxed::Box<tokio::runtime::scheduler::multi_thread::worker::Core,alloc::alloc::Global>,tuple$<> > >,tokio::runtime::scheduler::multi_thread::worker::impl$1::run_task::closure_env$0><unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\coop.rs:102
  92: std::thread::local::LocalKey<core::cell::Cell<tokio::coop::Budget> >::try_with<core::cell::Cell<tokio::coop::Budget>,tokio::coop::with_budget::closure_env$0<enum2$<core::result::Result<alloc::boxed::Box<tokio::runtime::scheduler::multi_thread::worker::Cor<unknown>
      at /rustc/897e37553bba8b42751c67658967889d11ecd120\library\std\src\thread\local.rs:445
  93: std::thread::local::LocalKey<core::cell::Cell<tokio::coop::Budget> >::with<core::cell::Cell<tokio::coop::Budget>,tokio::coop::with_budget::closure_env$0<enum2$<core::result::Result<alloc::boxed::Box<tokio::runtime::scheduler::multi_thread::worker::Core,al<unknown>
      at /rustc/897e37553bba8b42751c67658967889d11ecd120\library\std\src\thread\local.rs:421
  94: tokio::coop::with_budget<unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\coop.rs:95
  95: tokio::coop::budget<unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\coop.rs:72
  96: tokio::runtime::scheduler::multi_thread::worker::Context::run_task<unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\runtime\scheduler\multi_thread\worker.rs:420
  97: tokio::runtime::scheduler::multi_thread::worker::Context::run<unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\runtime\scheduler\multi_thread\worker.rs:387
  98: tokio::runtime::scheduler::multi_thread::worker::run::closure$0<unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\runtime\scheduler\multi_thread\worker.rs:372
  99: tokio::macros::scoped_tls::ScopedKey<tokio::runtime::scheduler::multi_thread::worker::Context>::set<tokio::runtime::scheduler::multi_thread::worker::Context,tokio::runtime::scheduler::multi_thread::worker::run::closure_env$0,tuple$<> ><unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\macros\scoped_tls.rs:61
  100: tokio::runtime::scheduler::multi_thread::worker::run<unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\runtime\scheduler\multi_thread\worker.rs:369
  101: tokio::runtime::scheduler::multi_thread::worker::impl$0::launch::closure$0<unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\runtime\scheduler\multi_thread\worker.rs:348
  102: tokio::runtime::blocking::task::impl$2::poll<tokio::runtime::scheduler::multi_thread::worker::impl$0::launch::closure_env$0,tuple$<> ><unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\runtime\blocking\task.rs:42
  103: tokio::runtime::task::core::impl$3::poll::closure$0<tokio::runtime::blocking::task::BlockingTask<tokio::runtime::scheduler::multi_thread::worker::impl$0::launch::closure_env$0> ><unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\runtime\task\core.rs:184
  104: tokio::loom::std::unsafe_cell::UnsafeCell<enum2$<tokio::runtime::task::core::Stage<tokio::runtime::blocking::task::BlockingTask<tokio::runtime::scheduler::multi_thread::worker::impl$0::launch::closure_env$0> > > >::with_mut<enum2$<tokio::runtime::task::co<unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\loom\std\unsafe_cell.rs:14
  105: tokio::runtime::task::core::CoreStage<tokio::runtime::blocking::task::BlockingTask<tokio::runtime::scheduler::multi_thread::worker::impl$0::launch::closure_env$0> >::poll<tokio::runtime::blocking::task::BlockingTask<tokio::runtime::scheduler::multi_thread<unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\runtime\task\core.rs:174
  106: tokio::runtime::task::harness::poll_future::closure$0<tokio::runtime::blocking::task::BlockingTask<tokio::runtime::scheduler::multi_thread::worker::impl$0::launch::closure_env$0>,tokio::runtime::blocking::schedule::NoopSchedule><unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\runtime\task\harness.rs:480
  107: core::panic::unwind_safe::impl$23::call_once<enum2$<core::task::poll::Poll<tuple$<> > >,tokio::runtime::task::harness::poll_future::closure_env$0<tokio::runtime::blocking::task::BlockingTask<tokio::runtime::scheduler::multi_thread::worker::impl$0::launch:<unknown>
      at /rustc/897e37553bba8b42751c67658967889d11ecd120\library\core\src\panic\unwind_safe.rs:271
  108: std::panicking::try::do_call<core::panic::unwind_safe::AssertUnwindSafe<tokio::runtime::task::harness::poll_future::closure_env$0<tokio::runtime::blocking::task::BlockingTask<tokio::runtime::scheduler::multi_thread::worker::impl$0::launch::closure_env$0>,<unknown>
      at /rustc/897e37553bba8b42751c67658967889d11ecd120\library\std\src\panicking.rs:492
  109: std::panicking::try::do_catch<core::panic::unwind_safe::AssertUnwindSafe<tokio::runtime::task::harness::poll_future::closure_env$1<tokio::runtime::blocking::task::BlockingTask<tokio::net::addr::impl$17::to_socket_addrs::closure_env$0>,tokio::runtime::bloc<unknown>
      at <unknown source file>:<unknown line>
  110: std::panicking::try<enum2$<core::task::poll::Poll<tuple$<> > >,core::panic::unwind_safe::AssertUnwindSafe<tokio::runtime::task::harness::poll_future::closure_env$0<tokio::runtime::blocking::task::BlockingTask<tokio::runtime::scheduler::multi_thread::worke<unknown>
      at /rustc/897e37553bba8b42751c67658967889d11ecd120\library\std\src\panicking.rs:456
  111: std::panic::catch_unwind<core::panic::unwind_safe::AssertUnwindSafe<tokio::runtime::task::harness::poll_future::closure_env$0<tokio::runtime::blocking::task::BlockingTask<tokio::runtime::scheduler::multi_thread::worker::impl$0::launch::closure_env$0>,toki<unknown>
      at /rustc/897e37553bba8b42751c67658967889d11ecd120\library\std\src\panic.rs:137
  112: tokio::runtime::task::harness::poll_future<tokio::runtime::blocking::task::BlockingTask<tokio::runtime::scheduler::multi_thread::worker::impl$0::launch::closure_env$0>,tokio::runtime::blocking::schedule::NoopSchedule><unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\runtime\task\harness.rs:468
  113: tokio::runtime::task::harness::Harness<tokio::runtime::blocking::task::BlockingTask<tokio::runtime::scheduler::multi_thread::worker::impl$0::launch::closure_env$0>,tokio::runtime::blocking::schedule::NoopSchedule>::poll_inner<tokio::runtime::blocking::tas<unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\runtime\task\harness.rs:104
  114: tokio::runtime::task::harness::Harness<tokio::runtime::blocking::task::BlockingTask<tokio::runtime::scheduler::multi_thread::worker::impl$0::launch::closure_env$0>,tokio::runtime::blocking::schedule::NoopSchedule>::poll<tokio::runtime::blocking::task::Blo<unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\runtime\task\harness.rs:57
  115: tokio::runtime::task::raw::poll<tokio::runtime::blocking::task::BlockingTask<tokio::runtime::scheduler::multi_thread::worker::impl$0::launch::closure_env$0>,tokio::runtime::blocking::schedule::NoopSchedule><unknown>    
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\runtime\task\raw.rs:194
  116: tokio::runtime::task::raw::RawTask::poll<unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\runtime\task\raw.rs:134
  117: tokio::runtime::task::UnownedTask<tokio::runtime::blocking::schedule::NoopSchedule>::run<tokio::runtime::blocking::schedule::NoopSchedule><unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\runtime\task\mod.rs:422
  118: tokio::runtime::blocking::pool::Task::run<unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\runtime\blocking\pool.rs:111
  119: tokio::runtime::blocking::pool::Inner::run<unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\runtime\blocking\pool.rs:346
  120: tokio::runtime::blocking::pool::impl$5::spawn_thread::closure$0<unknown>
      at C:\Users\marce\.cargo\registry\src\github.com-1ecc6299db9ec823\tokio-1.21.2\src\runtime\blocking\pool.rs:321
                                โ‹ฎ 15 frames hidden โ‹ฎ

Run with COLORBT_SHOW_HIDDEN=1 environment variable to disable frame filtering.
Run with RUST_BACKTRACE=full to include source snippets.
thread panicked while panicking. aborting.
error: process didn't exit successfully: `target\debug\server_stats_v2.exe` (exit code: 0xc0000409, STATUS_STACK_BUFFER_OVERRUN)

Cannot select Authenticator at runtime

  1. Authenticator trait requires Self:Sized, because it is Clone, so it is not possible to have Box<dyn Authenticator>
  2. A concrete Authenticator impl type is tied to the Session type.

This means you can't abstract over authenticator in runtime, and e.g. initialize a Session from authentication config known only at runtime and then let all the other code use a single type of a Session.

TLS Example

Hello, I was wondering if it was possible to add an example of TLS config? Thank you!

error UnexpectedWriteType("CAS")) with multiple tokio tasks

I am getting error UnexpectedWriteType("CAS")) when doing batch inserts from multiple tasks in the same Session object.

It fails intermittently on a three node cluster with Cassandra version [cqlsh 6.0.0 | Cassandra 4.0.5 | CQL spec 3.4.5 | Native protocol v5]. It works consistently with a single task.

On a local Cassandra instance in docker, it work consistently with multiple tasks - version 4.0.6.

Session can hit errors if used immediately

When a SessionBuilder (e.g. TcpSessionBuilder) build() method is called, a Session is returned.
However there are still background tasks that need to complete before the session can actually be used.
Two examples I have hit are:

  1. Delayed enabling of event processing - causing the broadcast::receiver returned by create_event_receiver to miss events
  2. Delayed population of ClusterMetadataManager - causing failure to send prepared statement Cannot find node 127.0.0.1:9042 for statement re-preparation!

I think a reasonable solution to this would look like adding a tokio::sync::oneshot that gets sent when the session tasks have finished initializing and then the build() method can await on it.

This can be worked around locally by adding sleeps, but that is clearly not ideal.

rustc stack overflow when deriving IntoCdrsValue and TryFromRow if struct contains static string reference

I have a static string reference in my struct, when I try to compile this piece of code the rust compiler errors with stackoverflow. If having a reference is not supported it should give warning or error instead of expanding the macro to use a code that makes rustc crash.

#[derive(IntoCdrsValue, TryFromRow, Clone, Debug, Serialize, Deserialize)]
pub struct EventData {
    pub task_id: String,
    pub agent: String,
    pub event_name: &'static str,
    pub event_time: u32,
    pub interaction_id: String,
}

When I convert the above event_name to a owned type such as String, it is able to complete the code.

Additional information

rustc 1.68.0-nightly (270c94e48 2022-12-28)

async-fn-in-trait is enabled.
M1 Pro, ARM processor

Prepared statements bug.

Hello there!

Prepared statement creating only in single node scope. With fixed nodes observablilty and enabled load balancing it become cause of query error:

ERROR cdrs_tokio::transport: Transport error! error=Server error: ErrorBody { error_code: 9472, message: "Prepared query with ID dde1cb1db179403026499e16ff7ef1f5 not found (either the query was not prepared on this host (maybe the host has been restarted?) or you have prepared too many queries and it has been evicted from the internal cache)", additional_info:

Support for tracing informations

Hello ๐Ÿ‘‹
Is there any way to get tracing informations when making cassandra queries with this crate? I would have hope to get various spans showing the cassandra connections and query executions but I don't find any way to get it. Am I missing something or is this feature not available as of now ?

Paging

I was wondering if there is any way of using paging as described here: https://docs.datastax.com/en/developer/java-driver/4.14/manual/core/paging/ ?
I am trying to have a select query, which returns many rows, combined to about 60Mb. the performance is slow (~0.8 sec). I think this is a paging issue.

let response = session
        .query_with_params(
            SELECT_CQL,
            StatementParamsBuilder::new()
                .with_values(query_values!(id, month))
                .build(),
        )
        .await?
        .response_body()?
        .into_rows();

send_frame locks up on sending large frames >0x20000

Hi there,

I've switched to using cdrs-tokio from cdrs few days ago and started noticing some app lock ups. That usually happens when a large frame is being sent. Similar to issue fixed in #4 but on a larger threshold. It now happens when sending frames > ~0x20000 bytes.

Sorry, cannot provide a test case. I'll try to track it down.

Cheers,
-alph

Scope of protocol functionality

It seems this projects fork predates cassandra-proto

I am in need of a cassandra protocol crate that can support response encoding and query decoding for the purposes of implementing a proxy.
Would that be out of scope of cdrs-tokio's protocol functionality?
Would you be interested in pulling cdrs-tokio's protocol functionality into a cassandra-proto like crate? (I am happy to put in the work to make this happen)

I am currently looking at getting cassandra-proto cleaned up and improved at https://github.com/shotover/cassandra-proto
But this crate is fairly active so it would be a shame to duplicate work.
I would be happy to collaborate in any way that makes sense for your project: possibly a subcrate in this repo or possibly another separate repo.

Comparison to scylla-rust-driver

I recently came across https://github.com/scylladb/scylla-rust-driver
And also saw https://www.reddit.com/r/rust/comments/pfuwhf/help_wanted_cdrstokio_road_to_performance_and/
So it seems you have seen scylla-rust-driver and that spurred a lot of the cleanup of cdrs's performance mistakes.

After evaluating the performance of scylla-rust-driver it still seems to considerably outperform master cdrs-tokio, in both full scale usage and protocol serializing/deserializing.

But you clearly seem dedicated to cdrs-tokio instead of jumping over to scylla-rust-driver, so what do you think cdrs-tokio does better than scylla-rust-driver in order for you to keep working on it?

Error: List should not be empty

Hi team,

I'd like to raise this question again. It has been asked a while ago AlexPikalov/cdrs#282 but I think it deserves better handling.

The question is basically about allowing container fields like Vec<T> in the structs that make use of TryFromUdt or TryFromRow. Currently, if a table contains an empty list, try_from_udt (try_from_row) will fail with the error above.

The proposed solution used to be to wrap all the containers in Option, but it is quite inconvenient and looks like an overkill.
An empty list is a valid construct and I do not see a point in requiring that wrapping from the application-level logic.

In our fork of the repo, we have a fix, that allows empty collections. I'd like to check if there are any objections to upstream it?

Thanks,
-alph

Keyspace setup falls Transport to infinite loop

Hello :-)

I tried to use TcpSessionBuilder with .with_keyspace, but received high CPU and network usage.

When i started digging i found that start_reading and start_writing methods are in infinite loop.

It looks like because in start_reading method cassandra always return response with SetKeyspace result:

[cdrs-tokio-6.2.0/cdrs-tokio/src/transport.rs:320] &body = Result(
    SetKeyspace(
        BodyResResultSetKeyspace {
            body: "my_keyspace",
        },
    ),
)

When i removed .with_keyspace everything worked good.

High CPU usage for cdrs_tokio::transport::AsyncTransport::start_processing

I have setup a cassandra cluster of 6 pods using the helm charts and I am connecting to that cluster with one known node. The application works fine, but occasionally the cpu usage of my application would go full and the application becomes unresponsive. I sampled the cpu usage at one of the peak cpu usage for 1 second and found that 99% of cpu is being used in cdrs_tokio::transport::AsyncTransport::start_processing, I have attached the complete flamegraph SVG for your reference here. I also observe that after few hours of full cpu usage, the usage does comes down to 50% (but still high)

out

The flamegraph was captured without debug symbols since It happens occasionally, below is the code that connect demonstrate how I make the connection, here CASSANDRA_HOST is the name of the service of K8 application.

pub async fn create_cassandra_session() -> anyhow::Result<CassandraSession> {
    let url = format!(
        "{}:{}",
        std::env::var("CASSANDRA_HOST").unwrap_or("127.0.0.1".to_string()),
        std::env::var("CASSANDRA_PORT").unwrap_or("9042".to_string())
    );
    tracing::info!("Connecting to cassandra at {url}");
    let cluster_config = if let (Some(username), Some(password)) = (
        std::env::var("CASSANDRA_USERNAME").ok(),
        std::env::var("CASSANDRA_PASSWORD").ok(),
    ) {
        tracing::info!("Cassandra static credentials read from environment");
        let authenticator = StaticPasswordAuthenticatorProvider::new(username, password);
        NodeTcpConfigBuilder::new()
            .with_contact_point(url.into())
            .with_authenticator_provider(Arc::new(authenticator))
            .build()
            .await?
    } else {
        tracing::info!("Cassandra is connecting without credentials");
        NodeTcpConfigBuilder::new()
            .with_contact_point(url.into())
            .build()
            .await?
    };

    Ok(TcpSessionBuilder::new(RoundRobinLoadBalancingStrategy::new(), cluster_config).build()?)
}

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.