Giter Club home page Giter Club logo

tokio-tungstenite's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

tokio-tungstenite's Issues

SSL23_GET_SERVER_HELLO error with the new 0.10 release

code to reproduce:

use tokio_tungstenite::connect_async;

#[tokio::main]
async fn main() {
    let url = url::Url::parse(&"wss://www.deribit.com/ws/api/v2").unwrap();

    connect_async(url).await.expect("Failed to connect");
}

Compile with tls the feature.

Full error

thread 'main' panicked at 'Failed to connect: Tls(Ssl(Error { code: ErrorCode(1), cause: Some(Ssl(ErrorStack([Error { code: 336031996, library: "SSL routines", function: "SSL23_GET_SERVER_HELLO", reason: "unknown protocol", file: "s23_clnt.c", line: 794 }]))) }, X509VerifyResult { code: 0, error: "ok" }))', src/libcore/result.rs:1188:5

Advantages / Motivation

Could you provide some information that describes the advantages of tokio-tungstenite (compared to websocket, ws, tk-http)?. What was your motivation to build a new library instead of using an existing?
Many thanks in advance!

assertion failed when testing examples/server.rs

To reproduce, use the Simple WebSocket Client chrome extension and connect then disconnect.

> RUST_BACKTRACE=full cargo run
    Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs
     Running `target/debug/test-tung-server`
Listening on: 127.0.0.1:8080
New WebSocket connection: 127.0.0.1:50700
thread 'main' panicked at 'assertion failed: `(left == right)` (left: `1`, right: `0`)', /home/bbigras/.cargo/registry/src/github.com-1ecc6299db9ec823/tungstenite-0.2.0/src/protocol/frame/frame.rs:26
stack backtrace:
   0:     0x55667b188153 - std::sys::imp::backtrace::tracing::imp::unwind_backtrace::h0c49f46a3545f908
                               at /checkout/src/libstd/sys/unix/backtrace/tracing/gcc_s.rs:49
   1:     0x55667b1843b4 - std::sys_common::backtrace::_print::hcef39a9816714c4c
                               at /checkout/src/libstd/sys_common/backtrace.rs:71
   2:     0x55667b18a287 - std::panicking::default_hook::{{closure}}::h7c3c94835e02f846
                               at /checkout/src/libstd/sys_common/backtrace.rs:60
                               at /checkout/src/libstd/panicking.rs:355
   3:     0x55667b189e0b - std::panicking::default_hook::h0bf7bc3112fb107d
                               at /checkout/src/libstd/panicking.rs:371
   4:     0x55667b18a6fb - std::panicking::rust_panic_with_hook::ha27630c950090fec
                               at /checkout/src/libstd/panicking.rs:549
   5:     0x55667b18a5d4 - std::panicking::begin_panic::heb97fa3158b71158
                               at /checkout/src/libstd/panicking.rs:511
   6:     0x55667b18a509 - std::panicking::begin_panic_fmt::h8144403278d84748
                               at /checkout/src/libstd/panicking.rs:495
   7:     0x55667b0f723e - tungstenite::protocol::frame::frame::apply_mask_aligned32::h2e9587b64b443a55
                               at /home/bbigras/.cargo/registry/src/github.com-1ecc6299db9ec823/tungstenite-0.2.0/src/protocol/frame/frame.rs:26
   8:     0x55667b09bfa6 - tungstenite::protocol::frame::frame::Frame::remove_mask::{{closure}}::h464b933611967a31
                               at /home/bbigras/.cargo/registry/src/github.com-1ecc6299db9ec823/tungstenite-0.2.0/src/protocol/frame/frame.rs:224
   9:     0x55667b0a3fc5 - <core::option::Option<T>>::and_then::hc6cbc7d04a997358
                               at /checkout/src/libcore/option.rs:587
  10:     0x55667b09bf35 - tungstenite::protocol::frame::frame::Frame::remove_mask::h7d7a272641f1a4d0
                               at /home/bbigras/.cargo/registry/src/github.com-1ecc6299db9ec823/tungstenite-0.2.0/src/protocol/frame/frame.rs:221
  11:     0x55667b0c71e6 - <tungstenite::protocol::WebSocket<Stream>>::read_message_frame::hdce73b2af0b2291f
                               at /home/bbigras/.cargo/registry/src/github.com-1ecc6299db9ec823/tungstenite-0.2.0/src/protocol/mod.rs:197
  12:     0x55667b0c4939 - <tungstenite::protocol::WebSocket<Stream>>::read_message::he6e38db2dd59fe22
                               at /home/bbigras/.cargo/registry/src/github.com-1ecc6299db9ec823/tungstenite-0.2.0/src/protocol/mod.rs:93
  13:     0x55667b0e25d4 - <tokio_tungstenite::WebSocketStream<T> as futures::stream::Stream>::poll::hdfde1f9e11380517
                               at /home/bbigras/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-tungstenite-0.1.2/src/lib.rs:97
  14:     0x55667b0e285a - <futures::stream::split::SplitStream<S> as futures::stream::Stream>::poll::hb4a6de1455b07ea8
                               at /home/bbigras/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.12/src/stream/split.rs:14
  15:     0x55667b097913 - <futures::stream::for_each::ForEach<S, F, U> as futures::future::Future>::poll::hdc98d641d3de2017
                               at /home/bbigras/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.12/src/stream/for_each.rs:45
  16:     0x55667b0e21f3 - <futures::future::map::Map<A, F> as futures::future::Future>::poll::h8cdad3f417638717
                               at /home/bbigras/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.12/src/future/map.rs:30
  17:     0x55667b0e4516 - <futures::future::map_err::MapErr<A, F> as futures::future::Future>::poll::h8c67f452f112d092
                               at /home/bbigras/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.12/src/future/map_err.rs:30
  18:     0x55667b0e2efa - <futures::future::select::Select<A, B> as futures::future::Future>::poll::h57c6ee48948d3aac
                               at /home/bbigras/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.12/src/future/select.rs:48
  19:     0x55667b0ceac5 - <futures::future::chain::Chain<A, B, C>>::poll::h6a75eceb7124d4aa
                               at /home/bbigras/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.12/src/future/chain.rs:26
  20:     0x55667b0e535c - <futures::future::then::Then<A, B, F> as futures::future::Future>::poll::h5def7d01a5ad2636
                               at /home/bbigras/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.12/src/future/then.rs:32
  21:     0x55667b13e551 - <alloc::boxed::Box<F> as futures::future::Future>::poll::hb3767344ebe98fd2
                               at /home/bbigras/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.12/src/future/mod.rs:106
  22:     0x55667b127c7c - <futures::task_impl::Spawn<F>>::poll_future::{{closure}}::h48649b1ca7a1bffa
                               at /home/bbigras/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.12/src/task_impl/mod.rs:337
  23:     0x55667b12807e - <futures::task_impl::Spawn<T>>::enter::{{closure}}::hd669513906380924
                               at /home/bbigras/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.12/src/task_impl/mod.rs:484
  24:     0x55667b140843 - futures::task_impl::set::{{closure}}::hf10679f8e41b220c
                               at /home/bbigras/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.12/src/task_impl/mod.rs:61
  25:     0x55667b1298c7 - <std::thread::local::LocalKey<T>>::with::h74b2509b934fff98
                               at /checkout/src/libstd/thread/local.rs:253
  26:     0x55667b1405ef - futures::task_impl::set::h50f076371bd50260
                               at /home/bbigras/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.12/src/task_impl/mod.rs:54
  27:     0x55667b127fce - <futures::task_impl::Spawn<T>>::enter::hec76d727940c9640
                               at /home/bbigras/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.12/src/task_impl/mod.rs:484
  28:     0x55667b127c07 - <futures::task_impl::Spawn<F>>::poll_future::hb5a04041cb6e0824
                               at /home/bbigras/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.12/src/task_impl/mod.rs:337
  29:     0x55667b14c322 - tokio_core::reactor::Core::dispatch_task::{{closure}}::h49b3dd2d7da2a25e
                               at /home/bbigras/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-core-0.1.6/src/reactor/mod.rs:366
  30:     0x55667b124194 - <scoped_tls::ScopedKey<T>>::set::hdf6c715dc2147ce3
                               at /home/bbigras/.cargo/registry/src/github.com-1ecc6299db9ec823/scoped-tls-0.1.0/src/lib.rs:135
  31:     0x55667b14be68 - tokio_core::reactor::Core::dispatch_task::h2ca8f82a3a2b061c
                               at /home/bbigras/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-core-0.1.6/src/reactor/mod.rs:366
  32:     0x55667b14b3b5 - tokio_core::reactor::Core::dispatch::h0252fa03a28cc3c2
                               at /home/bbigras/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-core-0.1.6/src/reactor/mod.rs:324
  33:     0x55667b14af1b - tokio_core::reactor::Core::poll::ha5e0c2568b9e79a4
                               at /home/bbigras/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-core-0.1.6/src/reactor/mod.rs:312
  34:     0x55667b09a2a8 - tokio_core::reactor::Core::run::he9daa5ab6dd0f82d
                               at /home/bbigras/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-core-0.1.6/src/reactor/mod.rs:249
  35:     0x55667b0e5974 - test_tung_server::main::hc8b7e99d2f69155d
                               at /home/bbigras/dev/rust/tests/test-tung-server/src/main.rs:104
  36:     0x55667b18a3f5 - std::panicking::try::do_call::h689a21caeeef92aa
                               at /checkout/src/libcore/ops.rs:2606
                               at /checkout/src/libstd/panicking.rs:454
  37:     0x55667b1915da - __rust_maybe_catch_panic
                               at /checkout/src/libpanic_unwind/lib.rs:98
  38:     0x55667b18ae9a - std::rt::lang_start::hf63d494cb7dd034c
                               at /checkout/src/libstd/panicking.rs:433
                               at /checkout/src/libstd/panic.rs:361
                               at /checkout/src/libstd/rt.rs:57
  39:     0x55667b0e7302 - main
  40:     0x7f506f505510 - __libc_start_main
  41:     0x55667b096159 - _start
  42:                0x0 - <unknown>

rustc 1.17.0-nightly (7846dbe0c 2017-03-26)
futures 0.1.12
tokio-core 0.1.6
tokio-tungstenite 0.1.2
tungstenite 0.2.0

client: ping support

If I use tokio-tungstenite as a client, my application seems to reply to pings from the server but I was wondering if tokio-tungstenite (as a client) supported sending pings and dropping the connection on ping timeouts.

I'm not sure if you want this to be tokio-tungstenite's job or the job of the users but I think the users' code would be a lot simpler if there was pings support in tokio-tungstenite.

Extracting web-socket headers after handshake

Hello!

I'm looking a way to extract headers after handshake, between client and server. I'd like to make so because going check for a token in request, and if it doesn't exist, then return an error and shutdown the connection until token will be specified.

Does it possible to do something like this (part of this was taken from an example):

accept_async(stream)
    .and_then(move |ws_stream| {
        let req = Request::try_parse(ws_stream).unwrap();
        println!("Received a new ws handshake");
        println!("The request's path is: {}", req.path);
        println!("The request's headers are:");
        for &(ref header, _ /* value */) in req.headers.iter() {
            println!("* {}", header);
        }
        Ok(ws_stream)
    })
    .and_then(move |ws_stream| {
        ...
    })
});

instead of implementing a special callback structure with Callback trait and using it with accept_hdr_async?
Because if I'm trying to do this via accept_hdr_async and send the message with code like this:

let auth_middleware_callback = |request: &Request| {
    let auth_middleware_inner = self.auth_middleware.clone();
    let processing_result = auth_middleware_inner.borrow().process_request(request);

    match processing_result {
        Ok(_) => Ok(None),
        Err(err) => {
            let formatted_error = format!("{}", err);
            let message = formatted_error.into_bytes();

            let (_, out) = stream.split();
            use futures::Stream;
            out.write_all(message).unwrap();
            out.flush();
            Err(TungsteniteError::Http(401))
        }
    }
};

Leads to the following error:

error[E0599]: no method named `split` found for type `tokio_core::net::TcpStream` in the current scope
  --> src/proxy.rs:65:50
   |
65 |                         let (_, out) = stream.split();
   |                                               ^^^^^
   |
   = note: the method `split` exists but the following trait bounds were not satisfied:
           `tokio_core::net::TcpStream : futures::Stream`
   = help: items from traits can only be used if the trait is in scope
   = note: the following traits are implemented but not in scope, perhaps add a `use` for one of them:
           candidate #1: `use std::io::BufRead;`
           candidate #2: `use futures::Stream;`
           candidate #3: `use tokio_core::io::Io;`
           candidate #4: `use tokio_io::AsyncRead;`

error: aborting due to previous error

An another reason why I want to do this, it is provide an access to customize the behavior for reverse proxy. Like if was specified -v/--validate option, then necessary to check and validate passed token after establishing a connection. Otherwise don't do an additional work.

Specifying values in Sec-WebSocket-Protocol header

Hello!

I'm trying to append headers for a client, which is working with WebSocket connection, but he's receiving the following error in console of my browser, when creating a new connection between the client and server:

WebSocket connection to 'ws://127.0.0.1:8080/' failed: Error during WebSocket handshake: Sent non-empty 'Sec-WebSocket-Protocol' header but no response was received

For the client side I'm using a simple embedded JavaScript code, which is the part of HTML, for testing a reverse proxy functionality that have written with tokio-tungstenite:

var ws = null;
var isopen = false;

window.onload = function() {

   ws = new WebSocket("ws://127.0.0.1:8080", "token");

   ws.onopen = function() {
      console.log("Connected!");
      isopen = true;
   };

   ws.onmessage = function(e) {
       console.log(e.data);
   };

   ws.onclose = function(e) {
      console.log("Connection closed.");
      ws = null;
      isopen = false;
   }
};

function sendOnStaticURL() {
   if (isopen) {
      ws.send(JSON.stringify({
         'content': {'test': 'value'}
         'url': '/api/matchmaking/search'
      }));
      console.log("sendOnStaticURL() clicked.");
   } else {
      console.log("Connection not opened.")
   }
}

So, the question here is: Does the tokio-tungstenite library let me to add values in "Sec-WebSocket-Protocol" header? Or it doesn't supported yet and I should to find an another way to provide a token (like var ws = new WebSocket("ws://example.com/?token=testtoken)?

tokio-tungstenite and hyper server

Hi, i am writing a server where i need websocket connections. I have a whole server written in hyper. How can i add tokio-tungstenite to the hyper server ?

Would be good if you could show a simple example of that as i am still a newbie in Rust :)

Thank you.

tokio-tungstenite is incompatible with new tokio crate

Hello,

Recently the tokio crate was reformed, and when you're trying to build an application or library with tokio crate (instead of tokio-core, and so on), it leads to breaking your actual codebase.

In my case with an application, when I had tried to update dependencies in the Cargo.toml file and re-build the project, I've got the following error:

error[E0308]: mismatched types
  --> src\proxy.rs:63:52
   |
63 |         let server = listener.incoming().for_each(|(stream, addr)| {
   |                                                    ^^^^^^^^^^^^^^ expected struct `tokio::net::TcpStream`, found tuple
   |
   = note: expected type `tokio::net::TcpStream`
              found type `(_, _)`

Does the tokio-tungstenite crate have any plans in migrating to the new tokio crate?

The trait bound `tungstenite::handshake::client::Request<'static>: std::convert::From<url::Url>` is not satisfied?

Something new for previously working code?

error[E0277]: the trait bound `tungstenite::handshake::client::Request<'static>: std::convert::From<url::Url>` is not satisfied
  --> src\exchanges\connection.rs:24:30
   |
24 |         let (ws, response) = tokio_tungstenite::connect_async(url).compat().await?;
   |                              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ the trait `std::convert::From<url::Url>` is not implemented for `tungstenite::handshake::client::Request<'static>`
   |
   = help: the following implementations were found:
             <tungstenite::handshake::client::Request<'static> as std::convert::From<url::Url>>
   = note: required because of the requirements on the impl of `std::convert::Into<tungstenite::handshake::client::Request<'static>>` for `url::Url`
   = note: required by `tokio_tungstenite::connect::connect_async`

`accept_async` does immediate async i/o unexpectedly

I found out the hard way that accept_async is not only returning a future that handles the handshake, but it immediately accesses the async i/o object to start the handshake and returns a future to complete it.

This means that accept_async must be called from a task context, which I think isn't obvious and maybe should be stated more clearly.

In a pure futures 0.1 application, this most probably doesn't matter since accept_async is called when the accepted TcpStream is available, which pretty much always happens inside a task. However if you're using futures 0.3, you need to wrap a future 0.1 in a compatibility wrapper to use it with 0.3 and a current task is only provided to the future while polling it. I found out the hard way that this currently doesn't work well with tokio-tungstenite (see rust-lang/futures-rs#1463).

I'd like to propose to change the accept functions to only return a future and do the actual handshake when the future gets polled. That also seems to be the way everybody else handles it (async fns, futures-rs functions and methods, any other crate I can remember) and is probably the most expected way and would make it work smoothly with futures 0.3 compatibility wrappers as far as I can see.

Occasional connection problem

Hello,

When using Tungstenite Tokio version, the connection gets stuck. Based on a quick look to gdb backtrace, it may be a stuck in the receive side during connection handling. The main thread is blocked after this. Let me know if I can do anything to help to debug this.

0x00007ffff711187f in __libc_recv (fd=17, buf=0x7ffff6663000, n=4096, flags=0) at ../sysdeps/unix/sysv/linux/x86_64/recv.c:28
28      ../sysdeps/unix/sysv/linux/x86_64/recv.c: No such file or directory.
(gdb) info threads
  Id   Target Id         Frame
* 1    Thread 0x7ffff7fe9840 (LWP 30558) "mles-client" 0x00007ffff711187f in __libc_recv (fd=17, buf=0x7ffff6663000, n=4096, flags=0) at ../sysdeps/unix/sysv/linux/x86_64/recv.c:28
  2    Thread 0x7ffff65ff700 (LWP 30562) "mles-client" 0x00007ffff6c28e23 in epoll_wait () at ../sysdeps/unix/syscall-template.S:84
  3    Thread 0x7ffff63fe700 (LWP 30563) "mles-client" 0x00007ffff6c28e23 in epoll_wait () at ../sysdeps/unix/syscall-template.S:84
(gdb) bt
#0  0x00007ffff711187f in __libc_recv (fd=17, buf=0x7ffff6663000, n=4096, flags=0) at ../sysdeps/unix/sysv/linux/x86_64/recv.c:28
#1  0x00005555555ba7f2 in std::sys::imp::net::{{impl}}::recv_with_flags () at /checkout/src/libstd/sys/unix/net.rs:161
#2  std::sys::imp::net::{{impl}}::read () at /checkout/src/libstd/sys/unix/net.rs:170
#3  std::sys_common::net::{{impl}}::read () at /checkout/src/libstd/sys_common/net.rs:230
#4  std::net::tcp::{{impl}}::read () at /checkout/src/libstd/net/tcp.rs:459
#5  0x00005555555a9197 in _$LT$mio..net..tcp..TcpStream$u20$as$u20$std..io..Read$GT$::read::hd22429470efbac49 ()
#6  0x000055555559d75a in _$LT$tokio_core..net..tcp..TcpStream$u20$as$u20$std..io..Read$GT$::read::he4646848c964c4b7 ()
#7  0x0000555555563c9c in tungstenite::input_buffer::InputBuffer::read_from::hdce1e509ab469976 ()
#8  0x000055555556e119 in _$LT$tungstenite..handshake..MidHandshake$LT$Stream$C$$u20$Role$GT$$GT$::handshake::h117e18082a560944 ()
#9  0x000055555555fb26 in _$LT$futures..stream..for_each..ForEach$LT$S$C$$u20$F$C$$u20$U$GT$$u20$as$u20$futures..future..Future$GT$::poll::hce925305841b9174 ()
#10 0x000055555557cd8a in mles_client::main::h6272790849884ec7 ()
#11 0x00005555555c38c6 in std::panicking::try::do_call<fn(),()> () at /checkout/src/libstd/panicking.rs:454
#12 0x00005555555ca7eb in panic_unwind::__rust_maybe_catch_panic () at /checkout/src/libpanic_unwind/lib.rs:98
#13 0x00005555555c4117 in std::panicking::try<(),fn()> () at /checkout/src/libstd/panicking.rs:433
#14 std::panic::catch_unwind<fn(),()> () at /checkout/src/libstd/panic.rs:361
#15 std::rt::lang_start () at /checkout/src/libstd/rt.rs:57
#16 0x00007ffff6b42830 in __libc_start_main (main=0x5555555821b0 <main>, argc=3, argv=0x7fffffffdfd8, init=<optimized out>, fini=<optimized out>, rtld_fini=<optimized out>,
    stack_end=0x7fffffffdfc8) at ../csu/libc-start.c:291
#17 0x000055555555f5c9 in _start ()

Ref jq-rs/mles-rs#80

tokio_core::Framed for tokio_tungstenite?

I'm attempting to implement websockets for the rumqtt MQTT library and I'm running into some issues using this crate. My main problem is mentioned here:
AtherEnergy/rumqtt#70

But what I'd like to do is create an instance of tokio_core::Framed (to keep consistency with the rest of rumqtt) from a WebSocketStream. I am unsure how I could implement Read / AsyncRead for WebSocketStream, and was wondering if you had any suggestions?

I can't really see a way to do it with the current interfaces, since as far as I can tell, WebSocketStream is essentially equivalent to Framed, but without the ability to add encoders and decoders

I'm extremely new to tokio, so forgive me if I've completely misunderstood anything here.

how to close the connection gracefully

Does tokio-tungstenite has something to close the connection gracefully or does the users need to handle it (I guess with a Close frame and a timeout)?

Compiling Examples

I'm sure I'm doing something incorrectly but cannot find the issue. I'm trying to compile the simple client.rs and server.rs example but receiving a bunch of errors.

I'm assuming I'm using the wrong version of one of the dependencies?

Cargo.toml:

[package]
name = "game"
version = "0.1.0"
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
log = "0.4"
futures = "0.3.1"
pin-project = "0.4"
tokio = { version = "0.2", features = ["full"] }
url="*"
tungstenite="*"
tokio-tungstenite="*"
env_logger = "0.7"

And here's the errors I'm receiving:

error[E0277]: the trait bound `tokio::net::tcp::stream::TcpStream: tokio_io::async_read::AsyncRead` is not satisfied
   --> src/server.rs:46:53
    |
46  |     let ws_stream = tokio_tungstenite::accept_async(raw_stream)
    |                                                     ^^^^^^^^^^ the trait `tokio_io::async_read::AsyncRead` is not implemented for `tokio::net::tcp::stream::TcpStream`
    | 
   ::: /home/foo/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-tungstenite-0.9.0/src/lib.rs:114:8
    |
114 |     S: AsyncRead + AsyncWrite,
    |        --------- required by this bound in `tokio_tungstenite::accept_async`

error[E0277]: the trait bound `tokio::net::tcp::stream::TcpStream: tokio_io::async_write::AsyncWrite` is not satisfied
   --> src/server.rs:46:53
    |
46  |     let ws_stream = tokio_tungstenite::accept_async(raw_stream)
    |                                                     ^^^^^^^^^^ the trait `tokio_io::async_write::AsyncWrite` is not implemented for `tokio::net::tcp::stream::TcpStream`
    | 
   ::: /home/foo/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-tungstenite-0.9.0/src/lib.rs:114:20
    |
114 |     S: AsyncRead + AsyncWrite,
    |                    ---------- required by this bound in `tokio_tungstenite::accept_async`

error[E0277]: the trait bound `tokio::net::tcp::stream::TcpStream: tokio_io::async_read::AsyncRead` is not satisfied
  --> src/server.rs:46:21
   |
46 |     let ws_stream = tokio_tungstenite::accept_async(raw_stream)
   |                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ the trait `tokio_io::async_read::AsyncRead` is not implemented for `tokio::net::tcp::stream::TcpStream`
   |
   = note: required by `tokio_tungstenite::AcceptAsync`

error[E0277]: the trait bound `tokio::net::tcp::stream::TcpStream: tokio_io::async_write::AsyncWrite` is not satisfied
  --> src/server.rs:46:21
   |
46 |     let ws_stream = tokio_tungstenite::accept_async(raw_stream)
   |                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ the trait `tokio_io::async_write::AsyncWrite` is not implemented for `tokio::net::tcp::stream::TcpStream`
   |
   = note: required by `tokio_tungstenite::AcceptAsync`

error[E0277]: the trait bound `tokio_tungstenite::AcceptAsync<tokio::net::tcp::stream::TcpStream, tungstenite::handshake::server::NoCallback>: core::future::future::Future` is not satisfied
   --> src/server.rs:46:21
    |
46  |       let ws_stream = tokio_tungstenite::accept_async(raw_stream)
    |  _____________________^
47  | |         .await
    | |______________^ the trait `core::future::future::Future` is not implemented for `tokio_tungstenite::AcceptAsync<tokio::net::tcp::stream::TcpStream, tungstenite::handshake::server::NoCallback>`

error[E0277]: the trait bound `tokio::net::tcp::stream::TcpStream: tokio_io::async_read::AsyncRead` is not satisfied
  --> src/server.rs:46:21
   |
46 |       let ws_stream = tokio_tungstenite::accept_async(raw_stream)
   |  _____________________^
47 | |         .await
   | |______________^ the trait `tokio_io::async_read::AsyncRead` is not implemented for `tokio::net::tcp::stream::TcpStream`
   |
   = note: required by `tokio_tungstenite::AcceptAsync`

error[E0277]: the trait bound `tokio::net::tcp::stream::TcpStream: tokio_io::async_write::AsyncWrite` is not satisfied
  --> src/server.rs:46:21
   |
46 |       let ws_stream = tokio_tungstenite::accept_async(raw_stream)
   |  _____________________^
47 | |         .await
   | |______________^ the trait `tokio_io::async_write::AsyncWrite` is not implemented for `tokio::net::tcp::stream::TcpStream`
   |
   = note: required by `tokio_tungstenite::AcceptAsync`

error[E0277]: the trait bound `dyn futures::future::Future<Item = (tokio_tungstenite::WebSocketStream<tokio_tungstenite::stream::Stream<tokio_tcp::stream::TcpStream, tokio_tls::TlsStream<tokio_tcp::stream::TcpStream>>>, tungstenite::handshake::client::Response), Error = tungstenite::error::Error> + std::marker::Send: std::marker::Unpin` is not satisfied
   --> src/client.rs:31:26
    |
31  |     let (ws_stream, _) = connect_async(url).await.expect("Failed to connect");
    |                          ^^^^^^^^^^^^^^^^^^^^^^^^ the trait `std::marker::Unpin` is not implemented for `dyn futures::future::Future<Item = (tokio_tungstenite::WebSocketStream<tokio_tungstenite::stream::Stream<tokio_tcp::stream::TcpStream, tokio_tls::TlsStream<tokio_tcp::stream::TcpStream>>>, tungstenite::handshake::client::Response), Error = tungstenite::error::Error> + std::marker::Send`
    |
    = note: required because of the requirements on the impl of `core::future::future::Future` for `std::boxed::Box<dyn futures::future::Future<Item = (tokio_tungstenite::WebSocketStream<tokio_tungstenite::stream::Stream<tokio_tcp::stream::TcpStream, tokio_tls::TlsStream<tokio_tcp::stream::TcpStream>>>, tungstenite::handshake::client::Response), Error = tungstenite::error::Error> + std::marker::Send>`

error[E0277]: the trait bound `dyn futures::future::Future<Item = (tokio_tungstenite::WebSocketStream<tokio_tungstenite::stream::Stream<tokio_tcp::stream::TcpStream, tokio_tls::TlsStream<tokio_tcp::stream::TcpStream>>>, tungstenite::handshake::client::Response), Error = tungstenite::error::Error> + std::marker::Send: core::future::future::Future` is not satisfied
   --> src/client.rs:31:26
    |
31  |     let (ws_stream, _) = connect_async(url).await.expect("Failed to connect");
    |                          ^^^^^^^^^^^^^^^^^^^^^^^^ the trait `core::future::future::Future` is not implemented for `dyn futures::future::Future<Item = (tokio_tungstenite::WebSocketStream<tokio_tungstenite::stream::Stream<tokio_tcp::stream::TcpStream, tokio_tls::TlsStream<tokio_tcp::stream::TcpStream>>>, tungstenite::handshake::client::Response), Error = tungstenite::error::Error> + std::marker::Send`
    |
    = note: required because of the requirements on the impl of `core::future::future::Future` for `std::boxed::Box<dyn futures::future::Future<Item = (tokio_tungstenite::WebSocketStream<tokio_tungstenite::stream::Stream<tokio_tcp::stream::TcpStream, tokio_tls::TlsStream<tokio_tcp::stream::TcpStream>>>, tungstenite::handshake::client::Response), Error = tungstenite::error::Error> + std::marker::Send>`

error: aborting due to 9 previous errors


Processing server and WebSocket errors

Hi!

I'm trying to write an error processing for some corner cases, with the code like this:

let server = socket.incoming().for_each(|(stream, _addr)| {
    let proxy_inner = self;

    // Handler per each connection.
    accept_async(stream)
        .map_err(|err| {
            println!("Occurred an error during the WebSocket handshake: {}", err);
            Error::new(ErrorKind::Other, err)
        })
        // Check the Auth header
        .and_then(move |ws_stream| {
            println!("Checking the user's token");
            Ok(ws_stream)
        })
        // Process the messages
        .and_then(move |ws_stream| {
            let (mut sink, stream) = ws_stream.split();

            // Per each message
            stream.for_each(move |message: Message| {
                // Convert an incoming message into JSON
                let text_message = try!(message.into_text());
                let parsed_json = try!(proxy_inner.decode_message(text_message.as_str()));

                // Convert the specified API url to a Kafka topic
                let json_message = try!(proxy_inner.validate_json(parsed_json));
                let microservice = proxy_inner.match_microservice(json_message);
                Ok(())
            })
            // Validating errors
            .or_else(|err| {
                println!("Occurred error during processing a message: {}", err);
                // sink.start_send(Message::Text(format!("{}", err))); // <-- sink does not live long enough, if we will try so
                Ok(())
            })
        }).or_else(|err| {
            // Convert an error here to something human-readable
            println!("Occurred error with the WebSocket connection: {}", err);
            Ok(())
        })
});

And it works, but when an error raised inside this, in terminal it looks not so convenient (at least for understand what's going on):

Listening on: 127.0.0.1:8080
Checking the user's token
Occurred error during the processing a message: Connection closed:  (1001) // <-- Page with JS + WebSocket was reloaded by a user manually

So, the raised questions here are:

  1. Does the tungstenite/tokio-tungstenite provide some functionality for converting a WebSocket error into a human-readable error, like getting an error number and turn it into certain description? Or I should implelement my own mapping for this purposes?
  2. Is it a good way to send a validation error inside of or_else block with sink or is better to make it like in the example of tokio-tungstenite? If the inner block will contain the sending an error details response via sink (like in the example), will it be good when we have a couple message simultaneously?

'Error during the websocket handshake occurred: HTTP code: 400' with TLS and nginx

It's weird, tokio-tungstenite wont connect while others ws client do. It seems fine if I use tungstenite-rs directly.

I'm not sure if I'm not using tokio-tungstenite or if my nginx config is too strict.
ssl labs also says that "This site works only in browsers with SNI support."

I'm basically using examples/client.rs with a "wss://" URL.

nginx SSL config:

ssl_protocols TLSv1 TLSv1.1 TLSv1.2;
ssl_ciphers 'ECDHE-ECDSA-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305:ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:DHE-RSA-AES128-GCM-SHA256:DHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-AES128-SHA256:ECDHE-RSA-AES128-SHA256:ECDHE-ECDSA-AES128-SHA:ECDHE-RSA-AES256-SHA384:ECDHE-RSA-AES128-SHA:ECDHE-ECDSA-AES256-SHA384:ECDHE-ECDSA-AES256-SHA:ECDHE-RSA-AES256-SHA:DHE-RSA-AES128-SHA256:DHE-RSA-AES128-SHA:DHE-RSA-AES256-SHA256:DHE-RSA-AES256-SHA:ECDHE-ECDSA-DES-CBC3-SHA:ECDHE-RSA-DES-CBC3-SHA:EDH-RSA-DES-CBC3-SHA:AES128-GCM-SHA256:AES256-GCM-SHA384:AES128-SHA256:AES256-SHA256:AES128-SHA:AES256-SHA:DES-CBC3-SHA:!DSS';

Custom lifetimes for callback request headers and futures

Like as mentioned in the issue about headers, I'd decided to move a part of callback code to the future and using it later. But I'm getting errors, when trying to assign a custom lifetime to a code, where client headers is going to outlive futures, so that I could use it inside of futures (like, to check a token inside of the main processing part). After the moment when the client will end sending requests (or the connecting will be closed by the server by some certain reason), header should deleted.

Code:

pub fn run<'a: 'b, 'b>(&self, address: SocketAddr) {
    let mut core = Core::new().unwrap();
    let handle = core.handle();
    let socket = TcpListener::bind(&address, &handle).unwrap();
    println!("Listening on: {}", address);

    let server = socket.incoming().for_each(|(stream, addr)| {
        let engine_inner = self.engine.clone();
        let connections_inner = self.connections.clone();
        let auth_middleware_inner = self.auth_middleware.clone();
        let handle_inner = handle.clone();

        let headers: &'a Headers = &HashMap::new();
        let copy_headers_callback = |request: &'b Request| {
            // Just copy the headers as is
            for &(ref name, ref value) in request.headers.iter() {
                headers[&name.clone()] = value.clone()
            }
            Ok(None)
        };

        accept_hdr_async<'b>(stream, copy_headers_callback)
            // Process the messages
            .and_then(move |ws_stream| {
                // Create a channel for the stream, which other sockets will use to
                // send us messages. It could be used for broadcasting your data to
                // another users in the future.
                let (tx, rx) = mpsc::unbounded();
                connections_inner.borrow_mut().insert(addr, tx);

                // Split the WebSocket stream so that it will be possible to work
                // with the reading and writing halves separately.
                let (sink, stream) = ws_stream.split();

                // Check headers, if we're working in a secured mode
                let auth_future = auth_middleware_inner.borrow().process_request(headers, &handle_inner);
                ....
    });

    // Run the server
    core.run(server).unwrap();
}

Errors:

error[E0277]: the trait bound `(): futures::Future` is not satisfied
  --> src/proxy.rs:51:40
   |
51 |         let server = socket.incoming().for_each(|(stream, addr)| {
   |                                        ^^^^^^^^ the trait `futures::Future` is not implemented for `()`
   |
   = note: required because of the requirements on the impl of `futures::IntoFuture` for `()`

error[E0277]: the trait bound `(): futures::Future` is not satisfied
   --> src/proxy.rs:119:14
    |
119 |         core.run(server).unwrap();
    |              ^^^ the trait `futures::Future` is not implemented for `()`
    |
    = note: required because of the requirements on the impl of `futures::IntoFuture` for `()`
    = note: required because of the requirements on the impl of `futures::Future` for `futures::stream::ForEach<tokio_core::net::Incoming, [closure@src/proxy.rs:51:49: 116:10 self:_, handle:_], ()>`

Any ideas how it could be fixed?

how to read/capture original error when doing using connect_async?

I have a load test client that is doing something like

connect_async(wsaddr)
        .await
        .expect("problems connecting to ws");

I'm running my client and I see panics such as:

panicked at 'problems connecting to ws: Io(Custom { kind: Other, error: "" })'

but I'd like to understand what is the root problem of it.

is there any way to capture/print/see the original lower level error that may have caused this?

Not receiving incoming messages via stream

I have some tokio-tungstenite code that's using the latest commit on master and it works beautifully on my Macbook and my CI for Windows passes just fine. The problem is that it seems like the reader never receives any messages!

My code for repro can be found here and it's loosely based on the pattern described in examples/client.rs.

I understand that I'm using the latest commit on master and I'm not expecting this to be stable but wanted to get it on the radar :) Please let me know if there's anything else I can do to help with this!

Update to use tokio-io crate as the tokio_core::io mod is deprecated

First Blood!!
Need to update this crate with the new tokio-io library.
Here are teh warnings I'm getting,

warning: use of deprecated item: moved to the `tokio-io` crate
  --> src/lib.rs:31:5
   |
31 | use tokio_core::io::Io;
   |     ^^^^^^^^^^^^^^^^^^
   |
   = note: #[warn(deprecated)] on by default

warning: use of deprecated item: moved to the `tokio-io` crate
  --> src/lib.rs:53:24
   |
53 | pub fn client_async<S: Io>(url: Url, stream: S) -> ConnectAsync<S> {
   |                        ^^
   |
   = note: #[warn(deprecated)] on by default

warning: use of deprecated item: moved to the `tokio-io` crate
  --> src/lib.rs:72:24
   |
72 | pub fn accept_async<S: Io>(stream: S) -> AcceptAsync<S> {
   |                        ^^
   |
   = note: #[warn(deprecated)] on by default

warning: use of deprecated item: moved to the `tokio-io` crate
  --> src/lib.rs:93:48
   |
93 | impl<T> Stream for WebSocketStream<T> where T: Io {
   |                                                ^^
   |
   = note: #[warn(deprecated)] on by default

warning: use of deprecated item: moved to the `tokio-io` crate
   --> src/lib.rs:102:46
    |
102 | impl<T> Sink for WebSocketStream<T> where T: Io {
    |                                              ^^
    |
    = note: #[warn(deprecated)] on by default

warning: use of deprecated item: moved to the `tokio-io` crate
   --> src/lib.rs:122:9
    |
122 | impl<S: Io> Future for ConnectAsync<S> {
    |         ^^
    |
    = note: #[warn(deprecated)] on by default

warning: use of deprecated item: moved to the `tokio-io` crate
   --> src/lib.rs:137:9
    |
137 | impl<S: Io> Future for AcceptAsync<S> {
    |         ^^
    |
    = note: #[warn(deprecated)] on by default

warning: use of deprecated item: moved to the `tokio-io` crate
   --> src/lib.rs:150:9
    |
150 | impl<S: Io, R: HandshakeRole> Future for MidHandshake<S, R> {
    |         ^^
    |
    = note: #[warn(deprecated)] on by default

    Finished dev [unoptimized + debuginfo] target(s) in 19.26 secs

Example for client connecting to multiple servers

I got your example for connecting to a websocket server working, but I need to connect to multiple servers and am stuck. I am very new to rust and I'm not sure how to pass multiple things to tokio::run. I think I need to use tokio's join_all and maybe future's lazy, but am not sure exactly how and cannot find an example.

What is the best way to change https://github.com/snapview/tokio-tungstenite/blob/master/examples/client.rs to create multiple connections to servers with different APIs?

WebSocketStream Sink implementation doesn't apply back-pressure

First, thanks for the sweet crate!

start_send() just accepts whatever message you get it and always asks for more! If the messages are coming in faster than Tungstenite can get them out, the underlying WebSocket::send_queue just fills up forever.

Besides the memory usage problem, in my server I want to deal with clients that can't take the bandwidth, either by kicking them or by dropping some packets. The wrapper Sink I made to accomplish this is especially hacky to work around the fact that start_send() will never refuse a message.

Perhaps there should be a configurable max send_queue len? I can make a PR if you're into that.

WS Connection suddenly drops

Hi guys i maybe have a noob question, but i need help. I can manage to open a new connection but cant hold it infinitely it drops after some time. Here is a code:

    let task_orderbook = connect_async(url.clone()).and_then(|(ws, _response)| {
        println!("Orderbook stream successfully established!");
        let subscription = "{\"event\":\"subscribe\",\"channel\":\"book\",\"prec\":\"R0\",\"symbol\":\"tBTCUSD\"}";
        let (sink, stream) = ws.split();
        sink.send(subscription.into()).wait().ok();
        let result = stream.for_each(move |message| { stdout.write_all(&message.into_data()).unwrap(); stdout.flush().unwrap(); Ok(()) });
        result
            .map(|x|{println!("{:?}",x);()})
            //.select(sink.send(subscription.into()).map_err(|_|()))
            .then(|_|Ok(()))
    }).map_err(|e| {
        println!("Error during the websocket handshake occurred: {}", e);
        io::Error::new(io::ErrorKind::Other, e)
    });
tokio::run(task_orderbook);

Cancelation clarification

If I await tokio::time::timeout(web_socket_stream.next()), and the timeout happens before the socket receives a message (and so the web_socket_stream.next() future gets dropped), would this lead to skipping a message? Or is it OK to keep polling web_socket_stream.next() and ignore the result as long as it does not return Poll::Ready?

Holding one connection for multiple request from a user

Hello!
I'm trying to figure out with writing my own server with this crate and if I'm doing like in the code below:

pub struct Proxy {
    router: Box<Router>,
}

impl Proxy {
    pub fn new(router: Box<Router>) -> Proxy {
        Proxy {
            router: router
        }
    }

    pub fn run(&self, address: SocketAddr) {
        let mut core = Core::new().unwrap();
        let handle = core.handle();
        let socket = TcpListener::bind(&address, &handle).unwrap();
        println!("Listening on: {}", address);

        // The server loop.
        let server = socket.incoming().for_each(|(stream, addr)| {
            let handle_inner = handle.clone();

            // Handler per each connection.
            accept_async(stream).and_then(move |ws_stream| {
                println!("Checking the user's token");
                Ok(ws_stream)
            }).for_each(|ws_stream| {  // <--- Handling each incoming message separately?
                println!("Processing...");
                Ok(())
            })
        });

        // Run the server
        core.run(server).unwrap();
    }
}

Then getting the following error:

relrin at relrin in ~/Code/pathfinder/pathfinder exited 101 on git:master+ 
=> cargo run -- -c ./tests/files/config_with_valid_endpoints.yaml 
   Compiling pathfinder v0.1.0 (file:///home/relrin/Code/pathfinder/pathfinder)
error[E0599]: no method named `for_each` found for type `futures::AndThen<tokio_tungstenite::AcceptAsync<tokio_core::net::TcpStream, tungstenite::handshake::server::NoCallback>, std::result::Result<tokio_tungstenite::WebSocketStream<tokio_core::net::TcpStream>, tungstenite::Error>, [closure@src/proxy.rs:38:43: 41:14]>` in the current scope
  --> src/proxy.rs:41:16
   |
41 |             }).for_each(|ws_stream| {
   |                ^^^^^^^^
   |
   = note: the method `for_each` exists but the following trait bounds were not satisfied:
           `futures::AndThen<tokio_tungstenite::AcceptAsync<tokio_core::net::TcpStream, tungstenite::handshake::server::NoCallback>, std::result::Result<tokio_tungstenite::WebSocketStream<tokio_core::net::TcpStream>, tungstenite::Error>, [closure@src/proxy.rs:38:43: 41:14]> : futures::Stream`
           `futures::AndThen<tokio_tungstenite::AcceptAsync<tokio_core::net::TcpStream, tungstenite::handshake::server::NoCallback>, std::result::Result<tokio_tungstenite::WebSocketStream<tokio_core::net::TcpStream>, tungstenite::Error>, [closure@src/proxy.rs:38:43: 41:14]> : std::iter::Iterator`

error: aborting due to previous error

So, my question here is how to make this case possible for using/implementing on the server-side? As far as I understand, the tokio-tungstenite crate don't let me to work at this way, because it presuming that one connection is equal one message, right?
It will be great, when the server can hold the connection with the client and will processing as much as possible message from him until he will disconnected manually (or server closed it).

cfg(not(feature=tls")) => leads to import error

use tokio_io::{AsyncRead, AsyncWrite};

I think tokio::io would be correct. Changing it leads to this:

   --> /home/mk/dev/tokio-tungstenite/src/connect.rs:124:5
    |
124 | /     Box::new(wrap_stream(stream, domain, mode)
125 | |                 .and_then(|mut stream| {
126 | |                     NoDelay::set_nodelay(&mut stream, true)
127 | |                         .map(move |()| stream)
128 | |                         .map_err(|e| e.into())
129 | |                 })
130 | |                 .and_then(move |stream| client_async(request, stream)))
    | |_______________________________________________________________________^ `futures::Future<Error=tungstenite::Error, Item=S>` cannot be sent between threads safely

Could crates.io require updating with the master branch on GitHub?

Hello, I'm new to Rust so I'm probably just a bit confused but I was looking through the docs for what appears to be the latest version of tokio-tungstenite on crates.io (v0.6.0), specifically the source for WebSocketStream in lib.rs (https://docs.rs/tokio-tungstenite/0.6.0/src/tokio_tungstenite/lib.rs.html#162-164), and it appears that the code for WebSocketStream is a fair bit different to that on the GitHub master branch. For example, comparing the struct and the poll function in 'impl Stream for WebSocketStream':

(crates.io source)

pub struct WebSocketStream<S> {
    inner: WebSocket<S>,
}

(GitHub source)

pub struct WebSocketStream<S> {
    inner: WebSocket<S>,
    stream_ended: bool,
}

(crates.io source)

fn poll(&mut self) -> Poll<Option<Message>, WsError> {
        self.inner.read_message().map(|m| Some(m)).to_async()
}

(GitHub source)

fn poll(&mut self) -> Poll<Option<Message>, WsError> {
        if self.stream_ended {
            self.stream_ended = false;
            return Ok(Async::Ready(None))
        }

        self.inner.read_message().map(|m| {
            if m.is_close() {
                self.stream_ended = true;
            }
            Some(m)
        }).to_async()
}

At least for me I can confirm that when I add 'tokio-tungstenite = "*"' to my Cargo.toml it uses the code that I can see on crates.io. Is this the case for anyone else?

(Edit: changed format for code blocks)

are pings handled in async mode?

Looking at #6 it looks like pings should be already supported and responded to when using tungstenite as a client.
Is that correct?

I wrote a ws client and I see my server sending a Ping but never getting a response from the client.
Is there any way I can confirm pings are being replied to?
Is there a "connect and wait forever" sample client I can look at to check what am I doing wrong?

Issue is most likely on my side; if you have any suggestions on how to troubleshoot I'd appreciate.

Processing multiple requests per each user

Hello!
I have a couple of questions about using tokio-tungstenite crate in a project, when user can send a multiple requests to a WebSocket server which is working with Kafka and returns some result to the caller:

  • Does the current state of the codebase provide a good way to solve this certain task? Or perhaps should I implement an additional wrappers/stuff for support it (with futures, thread pools, etc.)?
  • Can I somehow to write a code (perhaps has a method for it) that let me to check user credentials (existing and valid token in Redis) after handshake, but before processing requests? If something going wrong (for example, invalid token), then return an error to the caller.

Example for handling connections

I'm having a hard time handling new connections. When a new stream connects I want to send messages directly back and forth between that client to establish username/etc. However, the examples aren't very clear to me on how to create a handle_connection function.

I noticed in tungstenite-rs the autobahn server example does though. Is it similar to that? I've tried sending using the sink but I can't because ws_writer uses it.

Any advice would be great.

Client example no longer compiles

I know that there's been big changes since Tokio moving point releases, but thought I'd point out the obvious that examples/client.rs no longer compiles:

error[E0432]: unresolved import `futures::sync`
use futures::sync::mpsc;

error[E0425]: cannot find function `run` in module `tokio::runtime`
tokio::runtime::run(client.map_err(|_e| ()));

error: the `and_then` method cannot be invoked on a trait object
let client = connect_async(url).and_then(move |(ws_stream, _)| {

Echo server is not working

Reproduce:
In browser console: var exampleSocket = new WebSocket("ws://127.0.0.1:8080");

Server logs:

thread 'main' panicked at 'Error during the websocket handshake occurred: Io(Custom { kind: Other, error: "No \"Connection: upgrade\" in client request" })', src/libcore/result.rs:1192:5
stack backtrace:
   0: backtrace::backtrace::libunwind::trace
             at /cargo/registry/src/github.com-1ecc6299db9ec823/backtrace-0.3.40/src/backtrace/libunwind.rs:88
   1: backtrace::backtrace::trace_unsynchronized
             at /cargo/registry/src/github.com-1ecc6299db9ec823/backtrace-0.3.40/src/backtrace/mod.rs:66
   2: std::sys_common::backtrace::_print_fmt
             at src/libstd/sys_common/backtrace.rs:77
   3: <std::sys_common::backtrace::_print::DisplayBacktrace as core::fmt::Display>::fmt
             at src/libstd/sys_common/backtrace.rs:59
   4: core::fmt::write
             at src/libcore/fmt/mod.rs:1057
   5: std::io::Write::write_fmt
             at src/libstd/io/mod.rs:1426
   6: std::sys_common::backtrace::_print
             at src/libstd/sys_common/backtrace.rs:62
   7: std::sys_common::backtrace::print
             at src/libstd/sys_common/backtrace.rs:49
   8: std::panicking::default_hook::{{closure}}
             at src/libstd/panicking.rs:195
   9: std::panicking::default_hook
             at src/libstd/panicking.rs:215
  10: std::panicking::rust_panic_with_hook
             at src/libstd/panicking.rs:463
  11: rust_begin_unwind
             at src/libstd/panicking.rs:371
  12: core::panicking::panic_fmt
             at src/libcore/panicking.rs:85
  13: core::result::unwrap_failed
             at src/libcore/result.rs:1192
  14: core::result::Result<T,E>::expect
             at /rustc/85976442558bf2d09cec3aa49c9c9ba86fb15c1f/src/libcore/result.rs:991
  15: echo_server::accept_connection::{{closure}}
             at examples/echo-server.rs:42
  16: <std::future::GenFuture<T> as core::future::future::Future>::poll
             at /rustc/85976442558bf2d09cec3aa49c9c9ba86fb15c1f/src/libstd/future.rs:43
  17: tokio::task::core::Core<T>::poll
             at /home/vlad/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.11/src/task/core.rs:128
  18: tokio::task::harness::Harness<T,S>::poll::{{closure}}::{{closure}}
             at /home/vlad/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.11/src/task/harness.rs:120
  19: core::ops::function::FnOnce::call_once
             at /rustc/85976442558bf2d09cec3aa49c9c9ba86fb15c1f/src/libcore/ops/function.rs:232
  20: <std::panic::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once
             at /rustc/85976442558bf2d09cec3aa49c9c9ba86fb15c1f/src/libstd/panic.rs:318
  21: std::panicking::try::do_call
             at /rustc/85976442558bf2d09cec3aa49c9c9ba86fb15c1f/src/libstd/panicking.rs:296
  22: __rust_maybe_catch_panic
             at src/libpanic_unwind/lib.rs:79
  23: std::panicking::try
             at /rustc/85976442558bf2d09cec3aa49c9c9ba86fb15c1f/src/libstd/panicking.rs:272
  24: std::panic::catch_unwind
             at /rustc/85976442558bf2d09cec3aa49c9c9ba86fb15c1f/src/libstd/panic.rs:394
  25: tokio::task::harness::Harness<T,S>::poll::{{closure}}
             at /home/vlad/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.11/src/task/harness.rs:101
  26: tokio::loom::std::causal_cell::CausalCell<T>::with_mut
             at /home/vlad/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.11/src/loom/std/causal_cell.rs:41
  27: tokio::task::harness::Harness<T,S>::poll
             at /home/vlad/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.11/src/task/harness.rs:100
  28: tokio::task::raw::poll
             at /home/vlad/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.11/src/task/raw.rs:162
  29: tokio::task::raw::RawTask::poll
             at /home/vlad/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.11/src/task/raw.rs:113
  30: tokio::task::Task<S>::run
             at /home/vlad/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.11/src/task/mod.rs:381
  31: tokio::runtime::basic_scheduler::SchedulerPriv::tick
             at /home/vlad/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.11/src/runtime/basic_scheduler.rs:192
  32: tokio::runtime::basic_scheduler::BasicScheduler<P>::block_on
             at /home/vlad/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.11/src/runtime/basic_scheduler.rs:142
  33: tokio::runtime::Runtime::block_on::{{closure}}
             at /home/vlad/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.11/src/runtime/mod.rs:411
  34: tokio::runtime::context::enter
             at /home/vlad/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.11/src/runtime/context.rs:72
  35: tokio::runtime::handle::Handle::enter
             at /home/vlad/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.11/src/runtime/handle.rs:34
  36: tokio::runtime::Runtime::block_on
             at /home/vlad/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.11/src/runtime/mod.rs:408
  37: echo_server::main
             at examples/echo-server.rs:17
  38: std::rt::lang_start::{{closure}}
             at /rustc/85976442558bf2d09cec3aa49c9c9ba86fb15c1f/src/libstd/rt.rs:67
  39: std::rt::lang_start_internal::{{closure}}
             at src/libstd/rt.rs:52
  40: std::panicking::try::do_call
             at src/libstd/panicking.rs:296
  41: __rust_maybe_catch_panic
             at src/libpanic_unwind/lib.rs:79
  42: std::panicking::try
             at src/libstd/panicking.rs:272
  43: std::panic::catch_unwind
             at src/libstd/panic.rs:394
  44: std::rt::lang_start_internal
             at src/libstd/rt.rs:51
  45: std::rt::lang_start
             at /rustc/85976442558bf2d09cec3aa49c9c9ba86fb15c1f/src/libstd/rt.rs:67
  46: main
  47: __libc_start_main
  48: _start
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.

Real word example question

I want to adapt the https://github.com/snapview/tokio-tungstenite/blob/46dfd9ed3ee75b0261e9f5f71c8e70492407248b/examples/autobahn-server.rs to my use case. In addition to echoing every message the server should send a message to the client every second.

The new handle_connection function would look like this:

async fn handle_connection(peer: SocketAddr, stream: TcpStream) -> Result<()> {
    let mut ws_stream = accept_async(stream).await.expect("Failed to accept");

    info!("New WebSocket connection: {}", peer);

    let mut msg_fut = ws_stream.next();
    loop {
        let delay_fut = delay_for(Duration::from_millis(1000));
        
        match select(msg_fut, delay_fut).await {
            Either::Left((msg, _)) => {
                match msg {
                    Some(msg) => {
                        let msg = msg?;
                        if msg.is_text() || msg.is_binary() {
                            ws_stream.send(msg).await?;
                        }
                        msg_fut = ws_stream.next();
                    },
                    None => {
                        break;
                    },
                };
            },
            Either::Right((_, msg_fut_reuse)) => {
                msg_fut = msg_fut_reuse;
                // Next line error message:
                //
                // error[E0499]: cannot borrow `ws_stream` as mutable more than once at a time
                // --> src\main.rs:48:17
                //    |
                // 25 |     let mut msg_fut = ws_stream.next();
                //    |                       --------- first mutable borrow occurs here
                // ...
                // 29 |         match select(msg_fut, delay_fut).await {
                //    |                      ------- first borrow later used here
                // ...
                // 48 |                 ws_stream.send(Message::Text("hello".to_owned())).await?;
                //    |                 ^^^^^^^^^ second mutable borrow occurs here
                ws_stream.send(Message::Text("hello".to_owned())).await?;
            },
        }
    }

    Ok(())
}

Sadly this does not compile. I have tried to come to an agreement with the borrow checker but I cannot figure out how to reuse msg_fut_reuse for the next iteration of the loop.

Can anyone help me with this? I would gladly contribute it as an example later.

can't use tokio-tungstenite with tokio-timer

I'm not sure if the problem is with tokio-tungstenite or with tokio-timer.

To reproduce, run examples/server.rs with an extra extern crate tokio_timer;.

result:

> cargo build 
   Compiling test-tung-server v0.1.0 (file:///home/bbigras/dev/rust/tests/test-tung-server)
error[E0283]: type annotations required: cannot resolve `(): std::convert::From<_>`
  --> src/main.rs:77:36
   |
77 |                 let ws_writer = rx.fold(sink, |mut sink, msg| {
   |                                    ^^^^

error: aborting due to previous error

error: Could not compile `test-tung-server`.

To learn more, run the command again with --verbose.

rustc 1.17.0-nightly (7846dbe0c 2017-03-26)
futures 0.1.12
tokio-core 0.1.6
tokio-timer 0.1.1
tokio-tungstenite 0.1.2
tungstenite 0.2.1

0.6 release?

Awesome library here! I've been working on a web framework that uses this for websockets, and while getting close to a viable release, it turns out I can't release with a git dependency :D Specifically, being able to use the new tokio crate, and construct a websocket stream after having done my own handshake. Does it seem like 0.6 would be possible soon?

Connecting different async code with tokio-tungstenite

Hello!

I'm trying to implement checks for a token, that was specified in headers, but before doing an actual work I'd like to send a request to an existing Redis instance via an additional future (with redis-async crate at this case). The prepared future is invoking inside of the auth_middleware_callback callback (which is ... nested and leads to potential "callback hell"?). But stuck with two certain things:

  • Extracting the value from the generated future with Redis connection
  • Make the generated future so, that it won't block when it nested

So, actual code looks like this:

let server = socket.incoming().for_each(|(stream, addr)| {
    let engine_inner = self.engine.clone();
    let connections_inner = self.connections.clone();
    let handle_inner = handle.clone();

    let auth_middleware_callback = |request: &Request| {
        let handle_inner = handle.clone();
        let auth_middleware_inner = self.auth_middleware.clone();
        // Inside of this middleware we're checking and validating a specified token 
        let processing_result = auth_middleware_inner.borrow().process_request(request, &handle_inner);

        match processing_result {
            Ok(headers) => Ok(headers),
            Err(err) => {
                let formatted_error = format!("{}", err);
                let message = Cow::from(formatted_error);
                Err(TungsteniteError::Protocol(message))
            }
        }
    };

    accept_hdr_async(stream, auth_middleware_callback)
        // Process the messages
       .and_then(move |ws_stream| {
       // other code here...

The piece of work, that polling the Redis instance (yes, I know that is bad approach to get a value from the finished future (mostly it's for debugging proposes)):

fn get_user_id(&self, raw_token: String, handle: &Handle) -> Result<String> {
    let redis_socket_address = self.redis_address.parse().unwrap();
    let redis_connection = paired_connect(&redis_socket_address, handle);

    // Make the authentication before, if a password was specified.
    let get_user_id_future = redis_connection.and_then(move |connection| {
        // we will get here Ok("some user ID") if everything is good
        connection.send::<String>(resp_array!["GET", raw_token])
    });

    let mut processing_result;
    handle.spawn(get_user_id_future.then(move |response| {
        processing_result = match response {
            Ok(user_id) => Ok(String::from(user_id)),
            Err(_) => {
                let message = String::from("Token is expired or doesn't exist.");
                Err(PathfinderError::AuthenticationError(message))
            }
        };
        Ok(())
    }));

    processing_result
}

So, I have a couple of questions around this stuff:

  • Does it possible to specify (or implement) a custom future, so that it will be used later in chain with the main part of handling of a client connection? I mean the case, which could be looks like this:
let auth_middleware_callback = |request: &Request| {
    let handle_inner = handle.clone();
    let auth_middleware_inner = self.auth_middleware.clone();
    let redis_future = auth_middleware_inner.borrow().process_request(request, &handle_inner);
    redis_future
};

accept_hdr_async(stream, auth_middleware_callback)
    // extract a value from redis_future
    .and_then((ws_stream, redis_future) {
        // ...
    })
    // Process the messages
    .and_then(move |ws_stream| {

Or in this case I should to go an another way (may be use specify token in the message, and process in inside or accept_async)?

  • If return my code to a solution with accept_async, then can I return in the same time ws_stream and redis_future simultaneously? Or it possible so solve this issue somehow differently?
  • Are there any suitable solutions for connection async libs (like redis-async, librdkafka, etc.) with tokio-tungstenite, so that they are working together in async way? Perhaps it sounds too generic, but nonetheless.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.