snapview / tokio-tungstenite Goto Github PK
View Code? Open in Web Editor NEWFuture-based Tungstenite for Tokio. Lightweight stream-based WebSocket implementation
License: MIT License
Future-based Tungstenite for Tokio. Lightweight stream-based WebSocket implementation
License: MIT License
Could you provide some information that describes the advantages of tokio-tungstenite
(compared to websocket
, ws
, tk-http
)?. What was your motivation to build a new library instead of using an existing?
Many thanks in advance!
Awesome library here! I've been working on a web framework that uses this for websockets, and while getting close to a viable release, it turns out I can't release with a git dependency :D Specifically, being able to use the new tokio crate, and construct a websocket stream after having done my own handshake. Does it seem like 0.6 would be possible soon?
Hello!
I'm looking a way to extract headers after handshake, between client and server. I'd like to make so because going check for a token in request, and if it doesn't exist, then return an error and shutdown the connection until token will be specified.
Does it possible to do something like this (part of this was taken from an example):
accept_async(stream)
.and_then(move |ws_stream| {
let req = Request::try_parse(ws_stream).unwrap();
println!("Received a new ws handshake");
println!("The request's path is: {}", req.path);
println!("The request's headers are:");
for &(ref header, _ /* value */) in req.headers.iter() {
println!("* {}", header);
}
Ok(ws_stream)
})
.and_then(move |ws_stream| {
...
})
});
instead of implementing a special callback structure with Callback trait and using it with accept_hdr_async?
Because if I'm trying to do this via accept_hdr_async
and send the message with code like this:
let auth_middleware_callback = |request: &Request| {
let auth_middleware_inner = self.auth_middleware.clone();
let processing_result = auth_middleware_inner.borrow().process_request(request);
match processing_result {
Ok(_) => Ok(None),
Err(err) => {
let formatted_error = format!("{}", err);
let message = formatted_error.into_bytes();
let (_, out) = stream.split();
use futures::Stream;
out.write_all(message).unwrap();
out.flush();
Err(TungsteniteError::Http(401))
}
}
};
Leads to the following error:
error[E0599]: no method named `split` found for type `tokio_core::net::TcpStream` in the current scope
--> src/proxy.rs:65:50
|
65 | let (_, out) = stream.split();
| ^^^^^
|
= note: the method `split` exists but the following trait bounds were not satisfied:
`tokio_core::net::TcpStream : futures::Stream`
= help: items from traits can only be used if the trait is in scope
= note: the following traits are implemented but not in scope, perhaps add a `use` for one of them:
candidate #1: `use std::io::BufRead;`
candidate #2: `use futures::Stream;`
candidate #3: `use tokio_core::io::Io;`
candidate #4: `use tokio_io::AsyncRead;`
error: aborting due to previous error
An another reason why I want to do this, it is provide an access to customize the behavior for reverse proxy. Like if was specified -v
/--validate
option, then necessary to check and validate passed token after establishing a connection. Otherwise don't do an additional work.
Hello!
I'm trying to append headers for a client, which is working with WebSocket connection, but he's receiving the following error in console of my browser, when creating a new connection between the client and server:
WebSocket connection to 'ws://127.0.0.1:8080/' failed: Error during WebSocket handshake: Sent non-empty 'Sec-WebSocket-Protocol' header but no response was received
For the client side I'm using a simple embedded JavaScript code, which is the part of HTML, for testing a reverse proxy functionality that have written with tokio-tungstenite:
var ws = null;
var isopen = false;
window.onload = function() {
ws = new WebSocket("ws://127.0.0.1:8080", "token");
ws.onopen = function() {
console.log("Connected!");
isopen = true;
};
ws.onmessage = function(e) {
console.log(e.data);
};
ws.onclose = function(e) {
console.log("Connection closed.");
ws = null;
isopen = false;
}
};
function sendOnStaticURL() {
if (isopen) {
ws.send(JSON.stringify({
'content': {'test': 'value'}
'url': '/api/matchmaking/search'
}));
console.log("sendOnStaticURL() clicked.");
} else {
console.log("Connection not opened.")
}
}
So, the question here is: Does the tokio-tungstenite library let me to add values in "Sec-WebSocket-Protocol" header? Or it doesn't supported yet and I should to find an another way to provide a token (like var ws = new WebSocket("ws://example.com/?token=testtoken
)?
I'm attempting to implement websockets for the rumqtt MQTT library and I'm running into some issues using this crate. My main problem is mentioned here:
AtherEnergy/rumqtt#70
But what I'd like to do is create an instance of tokio_core::Framed (to keep consistency with the rest of rumqtt) from a WebSocketStream. I am unsure how I could implement Read / AsyncRead for WebSocketStream, and was wondering if you had any suggestions?
I can't really see a way to do it with the current interfaces, since as far as I can tell, WebSocketStream is essentially equivalent to Framed, but without the ability to add encoders and decoders
I'm extremely new to tokio, so forgive me if I've completely misunderstood anything here.
If I use tokio-tungstenite as a client, my application seems to reply to pings from the server but I was wondering if tokio-tungstenite (as a client) supported sending pings and dropping the connection on ping timeouts.
I'm not sure if you want this to be tokio-tungstenite's job or the job of the users but I think the users' code would be a lot simpler if there was pings support in tokio-tungstenite.
Hello,
Recently the tokio crate was reformed, and when you're trying to build an application or library with tokio crate (instead of tokio-core, and so on), it leads to breaking your actual codebase.
In my case with an application, when I had tried to update dependencies in the Cargo.toml
file and re-build the project, I've got the following error:
error[E0308]: mismatched types
--> src\proxy.rs:63:52
|
63 | let server = listener.incoming().for_each(|(stream, addr)| {
| ^^^^^^^^^^^^^^ expected struct `tokio::net::TcpStream`, found tuple
|
= note: expected type `tokio::net::TcpStream`
found type `(_, _)`
Does the tokio-tungstenite
crate have any plans in migrating to the new tokio crate?
I got your example for connecting to a websocket server working, but I need to connect to multiple servers and am stuck. I am very new to rust and I'm not sure how to pass multiple things to tokio::run. I think I need to use tokio's join_all and maybe future's lazy, but am not sure exactly how and cannot find an example.
What is the best way to change https://github.com/snapview/tokio-tungstenite/blob/master/examples/client.rs to create multiple connections to servers with different APIs?
Does tokio-tungstenite has something to close the connection gracefully or does the users need to handle it (I guess with a Close frame and a timeout)?
Hello!
I'm trying to figure out with writing my own server with this crate and if I'm doing like in the code below:
pub struct Proxy {
router: Box<Router>,
}
impl Proxy {
pub fn new(router: Box<Router>) -> Proxy {
Proxy {
router: router
}
}
pub fn run(&self, address: SocketAddr) {
let mut core = Core::new().unwrap();
let handle = core.handle();
let socket = TcpListener::bind(&address, &handle).unwrap();
println!("Listening on: {}", address);
// The server loop.
let server = socket.incoming().for_each(|(stream, addr)| {
let handle_inner = handle.clone();
// Handler per each connection.
accept_async(stream).and_then(move |ws_stream| {
println!("Checking the user's token");
Ok(ws_stream)
}).for_each(|ws_stream| { // <--- Handling each incoming message separately?
println!("Processing...");
Ok(())
})
});
// Run the server
core.run(server).unwrap();
}
}
Then getting the following error:
relrin at relrin in ~/Code/pathfinder/pathfinder exited 101 on git:master+
=> cargo run -- -c ./tests/files/config_with_valid_endpoints.yaml
Compiling pathfinder v0.1.0 (file:///home/relrin/Code/pathfinder/pathfinder)
error[E0599]: no method named `for_each` found for type `futures::AndThen<tokio_tungstenite::AcceptAsync<tokio_core::net::TcpStream, tungstenite::handshake::server::NoCallback>, std::result::Result<tokio_tungstenite::WebSocketStream<tokio_core::net::TcpStream>, tungstenite::Error>, [closure@src/proxy.rs:38:43: 41:14]>` in the current scope
--> src/proxy.rs:41:16
|
41 | }).for_each(|ws_stream| {
| ^^^^^^^^
|
= note: the method `for_each` exists but the following trait bounds were not satisfied:
`futures::AndThen<tokio_tungstenite::AcceptAsync<tokio_core::net::TcpStream, tungstenite::handshake::server::NoCallback>, std::result::Result<tokio_tungstenite::WebSocketStream<tokio_core::net::TcpStream>, tungstenite::Error>, [closure@src/proxy.rs:38:43: 41:14]> : futures::Stream`
`futures::AndThen<tokio_tungstenite::AcceptAsync<tokio_core::net::TcpStream, tungstenite::handshake::server::NoCallback>, std::result::Result<tokio_tungstenite::WebSocketStream<tokio_core::net::TcpStream>, tungstenite::Error>, [closure@src/proxy.rs:38:43: 41:14]> : std::iter::Iterator`
error: aborting due to previous error
So, my question here is how to make this case possible for using/implementing on the server-side? As far as I understand, the tokio-tungstenite crate don't let me to work at this way, because it presuming that one connection is equal one message, right?
It will be great, when the server can hold the connection with the client and will processing as much as possible message from him until he will disconnected manually (or server closed it).
tokio-tungstenite/src/connect.rs
Line 73 in 9740711
I think tokio::io
would be correct. Changing it leads to this:
--> /home/mk/dev/tokio-tungstenite/src/connect.rs:124:5
|
124 | / Box::new(wrap_stream(stream, domain, mode)
125 | | .and_then(|mut stream| {
126 | | NoDelay::set_nodelay(&mut stream, true)
127 | | .map(move |()| stream)
128 | | .map_err(|e| e.into())
129 | | })
130 | | .and_then(move |stream| client_async(request, stream)))
| |_______________________________________________________________________^ `futures::Future<Error=tungstenite::Error, Item=S>` cannot be sent between threads safely
In some conditions i'm getting Http(429) error code, but i need to get additional headers of response like Retry-After, how could i make it?
I would like to move the call below to a event_loop(), so I could properly match the connection errors,
https://github.com/wisespace-io/binance-rs/blob/master/src/websockets.rs#L66
however I didn't figure out what would be the return type in the call below
https://github.com/wisespace-io/binance-rs/blob/master/src/websockets.rs#L39
subj
It seems more common for futures::Stream
to return Ok(Async::Ready(None))
instead of an Err
when the stream is closed. Perhaps the impl Stream
in this crate should map Error::ConnectionClosed
to just None
?
To reproduce, use the Simple WebSocket Client chrome extension and connect then disconnect.
> RUST_BACKTRACE=full cargo run
Finished dev [unoptimized + debuginfo] target(s) in 0.0 secs
Running `target/debug/test-tung-server`
Listening on: 127.0.0.1:8080
New WebSocket connection: 127.0.0.1:50700
thread 'main' panicked at 'assertion failed: `(left == right)` (left: `1`, right: `0`)', /home/bbigras/.cargo/registry/src/github.com-1ecc6299db9ec823/tungstenite-0.2.0/src/protocol/frame/frame.rs:26
stack backtrace:
0: 0x55667b188153 - std::sys::imp::backtrace::tracing::imp::unwind_backtrace::h0c49f46a3545f908
at /checkout/src/libstd/sys/unix/backtrace/tracing/gcc_s.rs:49
1: 0x55667b1843b4 - std::sys_common::backtrace::_print::hcef39a9816714c4c
at /checkout/src/libstd/sys_common/backtrace.rs:71
2: 0x55667b18a287 - std::panicking::default_hook::{{closure}}::h7c3c94835e02f846
at /checkout/src/libstd/sys_common/backtrace.rs:60
at /checkout/src/libstd/panicking.rs:355
3: 0x55667b189e0b - std::panicking::default_hook::h0bf7bc3112fb107d
at /checkout/src/libstd/panicking.rs:371
4: 0x55667b18a6fb - std::panicking::rust_panic_with_hook::ha27630c950090fec
at /checkout/src/libstd/panicking.rs:549
5: 0x55667b18a5d4 - std::panicking::begin_panic::heb97fa3158b71158
at /checkout/src/libstd/panicking.rs:511
6: 0x55667b18a509 - std::panicking::begin_panic_fmt::h8144403278d84748
at /checkout/src/libstd/panicking.rs:495
7: 0x55667b0f723e - tungstenite::protocol::frame::frame::apply_mask_aligned32::h2e9587b64b443a55
at /home/bbigras/.cargo/registry/src/github.com-1ecc6299db9ec823/tungstenite-0.2.0/src/protocol/frame/frame.rs:26
8: 0x55667b09bfa6 - tungstenite::protocol::frame::frame::Frame::remove_mask::{{closure}}::h464b933611967a31
at /home/bbigras/.cargo/registry/src/github.com-1ecc6299db9ec823/tungstenite-0.2.0/src/protocol/frame/frame.rs:224
9: 0x55667b0a3fc5 - <core::option::Option<T>>::and_then::hc6cbc7d04a997358
at /checkout/src/libcore/option.rs:587
10: 0x55667b09bf35 - tungstenite::protocol::frame::frame::Frame::remove_mask::h7d7a272641f1a4d0
at /home/bbigras/.cargo/registry/src/github.com-1ecc6299db9ec823/tungstenite-0.2.0/src/protocol/frame/frame.rs:221
11: 0x55667b0c71e6 - <tungstenite::protocol::WebSocket<Stream>>::read_message_frame::hdce73b2af0b2291f
at /home/bbigras/.cargo/registry/src/github.com-1ecc6299db9ec823/tungstenite-0.2.0/src/protocol/mod.rs:197
12: 0x55667b0c4939 - <tungstenite::protocol::WebSocket<Stream>>::read_message::he6e38db2dd59fe22
at /home/bbigras/.cargo/registry/src/github.com-1ecc6299db9ec823/tungstenite-0.2.0/src/protocol/mod.rs:93
13: 0x55667b0e25d4 - <tokio_tungstenite::WebSocketStream<T> as futures::stream::Stream>::poll::hdfde1f9e11380517
at /home/bbigras/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-tungstenite-0.1.2/src/lib.rs:97
14: 0x55667b0e285a - <futures::stream::split::SplitStream<S> as futures::stream::Stream>::poll::hb4a6de1455b07ea8
at /home/bbigras/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.12/src/stream/split.rs:14
15: 0x55667b097913 - <futures::stream::for_each::ForEach<S, F, U> as futures::future::Future>::poll::hdc98d641d3de2017
at /home/bbigras/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.12/src/stream/for_each.rs:45
16: 0x55667b0e21f3 - <futures::future::map::Map<A, F> as futures::future::Future>::poll::h8cdad3f417638717
at /home/bbigras/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.12/src/future/map.rs:30
17: 0x55667b0e4516 - <futures::future::map_err::MapErr<A, F> as futures::future::Future>::poll::h8c67f452f112d092
at /home/bbigras/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.12/src/future/map_err.rs:30
18: 0x55667b0e2efa - <futures::future::select::Select<A, B> as futures::future::Future>::poll::h57c6ee48948d3aac
at /home/bbigras/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.12/src/future/select.rs:48
19: 0x55667b0ceac5 - <futures::future::chain::Chain<A, B, C>>::poll::h6a75eceb7124d4aa
at /home/bbigras/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.12/src/future/chain.rs:26
20: 0x55667b0e535c - <futures::future::then::Then<A, B, F> as futures::future::Future>::poll::h5def7d01a5ad2636
at /home/bbigras/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.12/src/future/then.rs:32
21: 0x55667b13e551 - <alloc::boxed::Box<F> as futures::future::Future>::poll::hb3767344ebe98fd2
at /home/bbigras/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.12/src/future/mod.rs:106
22: 0x55667b127c7c - <futures::task_impl::Spawn<F>>::poll_future::{{closure}}::h48649b1ca7a1bffa
at /home/bbigras/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.12/src/task_impl/mod.rs:337
23: 0x55667b12807e - <futures::task_impl::Spawn<T>>::enter::{{closure}}::hd669513906380924
at /home/bbigras/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.12/src/task_impl/mod.rs:484
24: 0x55667b140843 - futures::task_impl::set::{{closure}}::hf10679f8e41b220c
at /home/bbigras/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.12/src/task_impl/mod.rs:61
25: 0x55667b1298c7 - <std::thread::local::LocalKey<T>>::with::h74b2509b934fff98
at /checkout/src/libstd/thread/local.rs:253
26: 0x55667b1405ef - futures::task_impl::set::h50f076371bd50260
at /home/bbigras/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.12/src/task_impl/mod.rs:54
27: 0x55667b127fce - <futures::task_impl::Spawn<T>>::enter::hec76d727940c9640
at /home/bbigras/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.12/src/task_impl/mod.rs:484
28: 0x55667b127c07 - <futures::task_impl::Spawn<F>>::poll_future::hb5a04041cb6e0824
at /home/bbigras/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.12/src/task_impl/mod.rs:337
29: 0x55667b14c322 - tokio_core::reactor::Core::dispatch_task::{{closure}}::h49b3dd2d7da2a25e
at /home/bbigras/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-core-0.1.6/src/reactor/mod.rs:366
30: 0x55667b124194 - <scoped_tls::ScopedKey<T>>::set::hdf6c715dc2147ce3
at /home/bbigras/.cargo/registry/src/github.com-1ecc6299db9ec823/scoped-tls-0.1.0/src/lib.rs:135
31: 0x55667b14be68 - tokio_core::reactor::Core::dispatch_task::h2ca8f82a3a2b061c
at /home/bbigras/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-core-0.1.6/src/reactor/mod.rs:366
32: 0x55667b14b3b5 - tokio_core::reactor::Core::dispatch::h0252fa03a28cc3c2
at /home/bbigras/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-core-0.1.6/src/reactor/mod.rs:324
33: 0x55667b14af1b - tokio_core::reactor::Core::poll::ha5e0c2568b9e79a4
at /home/bbigras/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-core-0.1.6/src/reactor/mod.rs:312
34: 0x55667b09a2a8 - tokio_core::reactor::Core::run::he9daa5ab6dd0f82d
at /home/bbigras/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-core-0.1.6/src/reactor/mod.rs:249
35: 0x55667b0e5974 - test_tung_server::main::hc8b7e99d2f69155d
at /home/bbigras/dev/rust/tests/test-tung-server/src/main.rs:104
36: 0x55667b18a3f5 - std::panicking::try::do_call::h689a21caeeef92aa
at /checkout/src/libcore/ops.rs:2606
at /checkout/src/libstd/panicking.rs:454
37: 0x55667b1915da - __rust_maybe_catch_panic
at /checkout/src/libpanic_unwind/lib.rs:98
38: 0x55667b18ae9a - std::rt::lang_start::hf63d494cb7dd034c
at /checkout/src/libstd/panicking.rs:433
at /checkout/src/libstd/panic.rs:361
at /checkout/src/libstd/rt.rs:57
39: 0x55667b0e7302 - main
40: 0x7f506f505510 - __libc_start_main
41: 0x55667b096159 - _start
42: 0x0 - <unknown>
rustc 1.17.0-nightly (7846dbe0c 2017-03-26)
futures 0.1.12
tokio-core 0.1.6
tokio-tungstenite 0.1.2
tungstenite 0.2.0
It's weird, tokio-tungstenite wont connect while others ws client do. It seems fine if I use tungstenite-rs directly.
I'm not sure if I'm not using tokio-tungstenite or if my nginx config is too strict.
ssl labs also says that "This site works only in browsers with SNI support."
I'm basically using examples/client.rs with a "wss://" URL.
nginx SSL config:
ssl_protocols TLSv1 TLSv1.1 TLSv1.2;
ssl_ciphers 'ECDHE-ECDSA-CHACHA20-POLY1305:ECDHE-RSA-CHACHA20-POLY1305:ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:DHE-RSA-AES128-GCM-SHA256:DHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-AES128-SHA256:ECDHE-RSA-AES128-SHA256:ECDHE-ECDSA-AES128-SHA:ECDHE-RSA-AES256-SHA384:ECDHE-RSA-AES128-SHA:ECDHE-ECDSA-AES256-SHA384:ECDHE-ECDSA-AES256-SHA:ECDHE-RSA-AES256-SHA:DHE-RSA-AES128-SHA256:DHE-RSA-AES128-SHA:DHE-RSA-AES256-SHA256:DHE-RSA-AES256-SHA:ECDHE-ECDSA-DES-CBC3-SHA:ECDHE-RSA-DES-CBC3-SHA:EDH-RSA-DES-CBC3-SHA:AES128-GCM-SHA256:AES256-GCM-SHA384:AES128-SHA256:AES256-SHA256:AES128-SHA:AES256-SHA:DES-CBC3-SHA:!DSS';
Hi!
I'm trying to write an error processing for some corner cases, with the code like this:
let server = socket.incoming().for_each(|(stream, _addr)| {
let proxy_inner = self;
// Handler per each connection.
accept_async(stream)
.map_err(|err| {
println!("Occurred an error during the WebSocket handshake: {}", err);
Error::new(ErrorKind::Other, err)
})
// Check the Auth header
.and_then(move |ws_stream| {
println!("Checking the user's token");
Ok(ws_stream)
})
// Process the messages
.and_then(move |ws_stream| {
let (mut sink, stream) = ws_stream.split();
// Per each message
stream.for_each(move |message: Message| {
// Convert an incoming message into JSON
let text_message = try!(message.into_text());
let parsed_json = try!(proxy_inner.decode_message(text_message.as_str()));
// Convert the specified API url to a Kafka topic
let json_message = try!(proxy_inner.validate_json(parsed_json));
let microservice = proxy_inner.match_microservice(json_message);
Ok(())
})
// Validating errors
.or_else(|err| {
println!("Occurred error during processing a message: {}", err);
// sink.start_send(Message::Text(format!("{}", err))); // <-- sink does not live long enough, if we will try so
Ok(())
})
}).or_else(|err| {
// Convert an error here to something human-readable
println!("Occurred error with the WebSocket connection: {}", err);
Ok(())
})
});
And it works, but when an error raised inside this, in terminal it looks not so convenient (at least for understand what's going on):
Listening on: 127.0.0.1:8080
Checking the user's token
Occurred error during the processing a message: Connection closed: (1001) // <-- Page with JS + WebSocket was reloaded by a user manually
So, the raised questions here are:
or_else
block with sink or is better to make it like in the example of tokio-tungstenite? If the inner block will contain the sending an error details response via sink (like in the example), will it be good when we have a couple message simultaneously?I want to adapt the https://github.com/snapview/tokio-tungstenite/blob/46dfd9ed3ee75b0261e9f5f71c8e70492407248b/examples/autobahn-server.rs to my use case. In addition to echoing every message the server should send a message to the client every second.
The new handle_connection
function would look like this:
async fn handle_connection(peer: SocketAddr, stream: TcpStream) -> Result<()> {
let mut ws_stream = accept_async(stream).await.expect("Failed to accept");
info!("New WebSocket connection: {}", peer);
let mut msg_fut = ws_stream.next();
loop {
let delay_fut = delay_for(Duration::from_millis(1000));
match select(msg_fut, delay_fut).await {
Either::Left((msg, _)) => {
match msg {
Some(msg) => {
let msg = msg?;
if msg.is_text() || msg.is_binary() {
ws_stream.send(msg).await?;
}
msg_fut = ws_stream.next();
},
None => {
break;
},
};
},
Either::Right((_, msg_fut_reuse)) => {
msg_fut = msg_fut_reuse;
// Next line error message:
//
// error[E0499]: cannot borrow `ws_stream` as mutable more than once at a time
// --> src\main.rs:48:17
// |
// 25 | let mut msg_fut = ws_stream.next();
// | --------- first mutable borrow occurs here
// ...
// 29 | match select(msg_fut, delay_fut).await {
// | ------- first borrow later used here
// ...
// 48 | ws_stream.send(Message::Text("hello".to_owned())).await?;
// | ^^^^^^^^^ second mutable borrow occurs here
ws_stream.send(Message::Text("hello".to_owned())).await?;
},
}
}
Ok(())
}
Sadly this does not compile. I have tried to come to an agreement with the borrow checker but I cannot figure out how to reuse msg_fut_reuse
for the next iteration of the loop.
Can anyone help me with this? I would gladly contribute it as an example later.
Reproduce:
In browser console: var exampleSocket = new WebSocket("ws://127.0.0.1:8080");
Server logs:
thread 'main' panicked at 'Error during the websocket handshake occurred: Io(Custom { kind: Other, error: "No \"Connection: upgrade\" in client request" })', src/libcore/result.rs:1192:5
stack backtrace:
0: backtrace::backtrace::libunwind::trace
at /cargo/registry/src/github.com-1ecc6299db9ec823/backtrace-0.3.40/src/backtrace/libunwind.rs:88
1: backtrace::backtrace::trace_unsynchronized
at /cargo/registry/src/github.com-1ecc6299db9ec823/backtrace-0.3.40/src/backtrace/mod.rs:66
2: std::sys_common::backtrace::_print_fmt
at src/libstd/sys_common/backtrace.rs:77
3: <std::sys_common::backtrace::_print::DisplayBacktrace as core::fmt::Display>::fmt
at src/libstd/sys_common/backtrace.rs:59
4: core::fmt::write
at src/libcore/fmt/mod.rs:1057
5: std::io::Write::write_fmt
at src/libstd/io/mod.rs:1426
6: std::sys_common::backtrace::_print
at src/libstd/sys_common/backtrace.rs:62
7: std::sys_common::backtrace::print
at src/libstd/sys_common/backtrace.rs:49
8: std::panicking::default_hook::{{closure}}
at src/libstd/panicking.rs:195
9: std::panicking::default_hook
at src/libstd/panicking.rs:215
10: std::panicking::rust_panic_with_hook
at src/libstd/panicking.rs:463
11: rust_begin_unwind
at src/libstd/panicking.rs:371
12: core::panicking::panic_fmt
at src/libcore/panicking.rs:85
13: core::result::unwrap_failed
at src/libcore/result.rs:1192
14: core::result::Result<T,E>::expect
at /rustc/85976442558bf2d09cec3aa49c9c9ba86fb15c1f/src/libcore/result.rs:991
15: echo_server::accept_connection::{{closure}}
at examples/echo-server.rs:42
16: <std::future::GenFuture<T> as core::future::future::Future>::poll
at /rustc/85976442558bf2d09cec3aa49c9c9ba86fb15c1f/src/libstd/future.rs:43
17: tokio::task::core::Core<T>::poll
at /home/vlad/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.11/src/task/core.rs:128
18: tokio::task::harness::Harness<T,S>::poll::{{closure}}::{{closure}}
at /home/vlad/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.11/src/task/harness.rs:120
19: core::ops::function::FnOnce::call_once
at /rustc/85976442558bf2d09cec3aa49c9c9ba86fb15c1f/src/libcore/ops/function.rs:232
20: <std::panic::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once
at /rustc/85976442558bf2d09cec3aa49c9c9ba86fb15c1f/src/libstd/panic.rs:318
21: std::panicking::try::do_call
at /rustc/85976442558bf2d09cec3aa49c9c9ba86fb15c1f/src/libstd/panicking.rs:296
22: __rust_maybe_catch_panic
at src/libpanic_unwind/lib.rs:79
23: std::panicking::try
at /rustc/85976442558bf2d09cec3aa49c9c9ba86fb15c1f/src/libstd/panicking.rs:272
24: std::panic::catch_unwind
at /rustc/85976442558bf2d09cec3aa49c9c9ba86fb15c1f/src/libstd/panic.rs:394
25: tokio::task::harness::Harness<T,S>::poll::{{closure}}
at /home/vlad/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.11/src/task/harness.rs:101
26: tokio::loom::std::causal_cell::CausalCell<T>::with_mut
at /home/vlad/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.11/src/loom/std/causal_cell.rs:41
27: tokio::task::harness::Harness<T,S>::poll
at /home/vlad/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.11/src/task/harness.rs:100
28: tokio::task::raw::poll
at /home/vlad/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.11/src/task/raw.rs:162
29: tokio::task::raw::RawTask::poll
at /home/vlad/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.11/src/task/raw.rs:113
30: tokio::task::Task<S>::run
at /home/vlad/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.11/src/task/mod.rs:381
31: tokio::runtime::basic_scheduler::SchedulerPriv::tick
at /home/vlad/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.11/src/runtime/basic_scheduler.rs:192
32: tokio::runtime::basic_scheduler::BasicScheduler<P>::block_on
at /home/vlad/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.11/src/runtime/basic_scheduler.rs:142
33: tokio::runtime::Runtime::block_on::{{closure}}
at /home/vlad/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.11/src/runtime/mod.rs:411
34: tokio::runtime::context::enter
at /home/vlad/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.11/src/runtime/context.rs:72
35: tokio::runtime::handle::Handle::enter
at /home/vlad/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.11/src/runtime/handle.rs:34
36: tokio::runtime::Runtime::block_on
at /home/vlad/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.11/src/runtime/mod.rs:408
37: echo_server::main
at examples/echo-server.rs:17
38: std::rt::lang_start::{{closure}}
at /rustc/85976442558bf2d09cec3aa49c9c9ba86fb15c1f/src/libstd/rt.rs:67
39: std::rt::lang_start_internal::{{closure}}
at src/libstd/rt.rs:52
40: std::panicking::try::do_call
at src/libstd/panicking.rs:296
41: __rust_maybe_catch_panic
at src/libpanic_unwind/lib.rs:79
42: std::panicking::try
at src/libstd/panicking.rs:272
43: std::panic::catch_unwind
at src/libstd/panic.rs:394
44: std::rt::lang_start_internal
at src/libstd/rt.rs:51
45: std::rt::lang_start
at /rustc/85976442558bf2d09cec3aa49c9c9ba86fb15c1f/src/libstd/rt.rs:67
46: main
47: __libc_start_main
48: _start
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.
If I await tokio::time::timeout(web_socket_stream.next())
, and the timeout happens before the socket receives a message (and so the web_socket_stream.next()
future gets dropped), would this lead to skipping a message? Or is it OK to keep polling web_socket_stream.next()
and ignore the result as long as it does not return Poll::Ready
?
I have a load test client that is doing something like
connect_async(wsaddr)
.await
.expect("problems connecting to ws");
I'm running my client and I see panics such as:
panicked at 'problems connecting to ws: Io(Custom { kind: Other, error: "" })'
but I'd like to understand what is the root problem of it.
is there any way to capture/print/see the original lower level error that may have caused this?
client_async() allows now to let the user initiate the WebSocket handshake with extra headers. Similar support could be added to server side too, at least the ability to verify protocol would be great.
Hello, I'm new to Rust so I'm probably just a bit confused but I was looking through the docs for what appears to be the latest version of tokio-tungstenite on crates.io (v0.6.0), specifically the source for WebSocketStream in lib.rs (https://docs.rs/tokio-tungstenite/0.6.0/src/tokio_tungstenite/lib.rs.html#162-164), and it appears that the code for WebSocketStream is a fair bit different to that on the GitHub master branch. For example, comparing the struct and the poll function in 'impl Stream for WebSocketStream':
(crates.io source)
pub struct WebSocketStream<S> {
inner: WebSocket<S>,
}
(GitHub source)
pub struct WebSocketStream<S> {
inner: WebSocket<S>,
stream_ended: bool,
}
(crates.io source)
fn poll(&mut self) -> Poll<Option<Message>, WsError> {
self.inner.read_message().map(|m| Some(m)).to_async()
}
(GitHub source)
fn poll(&mut self) -> Poll<Option<Message>, WsError> {
if self.stream_ended {
self.stream_ended = false;
return Ok(Async::Ready(None))
}
self.inner.read_message().map(|m| {
if m.is_close() {
self.stream_ended = true;
}
Some(m)
}).to_async()
}
At least for me I can confirm that when I add 'tokio-tungstenite = "*"' to my Cargo.toml it uses the code that I can see on crates.io. Is this the case for anyone else?
(Edit: changed format for code blocks)
I'm sure I'm doing something incorrectly but cannot find the issue. I'm trying to compile the simple client.rs and server.rs example but receiving a bunch of errors.
I'm assuming I'm using the wrong version of one of the dependencies?
Cargo.toml:
[package]
name = "game"
version = "0.1.0"
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
log = "0.4"
futures = "0.3.1"
pin-project = "0.4"
tokio = { version = "0.2", features = ["full"] }
url="*"
tungstenite="*"
tokio-tungstenite="*"
env_logger = "0.7"
And here's the errors I'm receiving:
error[E0277]: the trait bound `tokio::net::tcp::stream::TcpStream: tokio_io::async_read::AsyncRead` is not satisfied
--> src/server.rs:46:53
|
46 | let ws_stream = tokio_tungstenite::accept_async(raw_stream)
| ^^^^^^^^^^ the trait `tokio_io::async_read::AsyncRead` is not implemented for `tokio::net::tcp::stream::TcpStream`
|
::: /home/foo/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-tungstenite-0.9.0/src/lib.rs:114:8
|
114 | S: AsyncRead + AsyncWrite,
| --------- required by this bound in `tokio_tungstenite::accept_async`
error[E0277]: the trait bound `tokio::net::tcp::stream::TcpStream: tokio_io::async_write::AsyncWrite` is not satisfied
--> src/server.rs:46:53
|
46 | let ws_stream = tokio_tungstenite::accept_async(raw_stream)
| ^^^^^^^^^^ the trait `tokio_io::async_write::AsyncWrite` is not implemented for `tokio::net::tcp::stream::TcpStream`
|
::: /home/foo/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-tungstenite-0.9.0/src/lib.rs:114:20
|
114 | S: AsyncRead + AsyncWrite,
| ---------- required by this bound in `tokio_tungstenite::accept_async`
error[E0277]: the trait bound `tokio::net::tcp::stream::TcpStream: tokio_io::async_read::AsyncRead` is not satisfied
--> src/server.rs:46:21
|
46 | let ws_stream = tokio_tungstenite::accept_async(raw_stream)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ the trait `tokio_io::async_read::AsyncRead` is not implemented for `tokio::net::tcp::stream::TcpStream`
|
= note: required by `tokio_tungstenite::AcceptAsync`
error[E0277]: the trait bound `tokio::net::tcp::stream::TcpStream: tokio_io::async_write::AsyncWrite` is not satisfied
--> src/server.rs:46:21
|
46 | let ws_stream = tokio_tungstenite::accept_async(raw_stream)
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ the trait `tokio_io::async_write::AsyncWrite` is not implemented for `tokio::net::tcp::stream::TcpStream`
|
= note: required by `tokio_tungstenite::AcceptAsync`
error[E0277]: the trait bound `tokio_tungstenite::AcceptAsync<tokio::net::tcp::stream::TcpStream, tungstenite::handshake::server::NoCallback>: core::future::future::Future` is not satisfied
--> src/server.rs:46:21
|
46 | let ws_stream = tokio_tungstenite::accept_async(raw_stream)
| _____________________^
47 | | .await
| |______________^ the trait `core::future::future::Future` is not implemented for `tokio_tungstenite::AcceptAsync<tokio::net::tcp::stream::TcpStream, tungstenite::handshake::server::NoCallback>`
error[E0277]: the trait bound `tokio::net::tcp::stream::TcpStream: tokio_io::async_read::AsyncRead` is not satisfied
--> src/server.rs:46:21
|
46 | let ws_stream = tokio_tungstenite::accept_async(raw_stream)
| _____________________^
47 | | .await
| |______________^ the trait `tokio_io::async_read::AsyncRead` is not implemented for `tokio::net::tcp::stream::TcpStream`
|
= note: required by `tokio_tungstenite::AcceptAsync`
error[E0277]: the trait bound `tokio::net::tcp::stream::TcpStream: tokio_io::async_write::AsyncWrite` is not satisfied
--> src/server.rs:46:21
|
46 | let ws_stream = tokio_tungstenite::accept_async(raw_stream)
| _____________________^
47 | | .await
| |______________^ the trait `tokio_io::async_write::AsyncWrite` is not implemented for `tokio::net::tcp::stream::TcpStream`
|
= note: required by `tokio_tungstenite::AcceptAsync`
error[E0277]: the trait bound `dyn futures::future::Future<Item = (tokio_tungstenite::WebSocketStream<tokio_tungstenite::stream::Stream<tokio_tcp::stream::TcpStream, tokio_tls::TlsStream<tokio_tcp::stream::TcpStream>>>, tungstenite::handshake::client::Response), Error = tungstenite::error::Error> + std::marker::Send: std::marker::Unpin` is not satisfied
--> src/client.rs:31:26
|
31 | let (ws_stream, _) = connect_async(url).await.expect("Failed to connect");
| ^^^^^^^^^^^^^^^^^^^^^^^^ the trait `std::marker::Unpin` is not implemented for `dyn futures::future::Future<Item = (tokio_tungstenite::WebSocketStream<tokio_tungstenite::stream::Stream<tokio_tcp::stream::TcpStream, tokio_tls::TlsStream<tokio_tcp::stream::TcpStream>>>, tungstenite::handshake::client::Response), Error = tungstenite::error::Error> + std::marker::Send`
|
= note: required because of the requirements on the impl of `core::future::future::Future` for `std::boxed::Box<dyn futures::future::Future<Item = (tokio_tungstenite::WebSocketStream<tokio_tungstenite::stream::Stream<tokio_tcp::stream::TcpStream, tokio_tls::TlsStream<tokio_tcp::stream::TcpStream>>>, tungstenite::handshake::client::Response), Error = tungstenite::error::Error> + std::marker::Send>`
error[E0277]: the trait bound `dyn futures::future::Future<Item = (tokio_tungstenite::WebSocketStream<tokio_tungstenite::stream::Stream<tokio_tcp::stream::TcpStream, tokio_tls::TlsStream<tokio_tcp::stream::TcpStream>>>, tungstenite::handshake::client::Response), Error = tungstenite::error::Error> + std::marker::Send: core::future::future::Future` is not satisfied
--> src/client.rs:31:26
|
31 | let (ws_stream, _) = connect_async(url).await.expect("Failed to connect");
| ^^^^^^^^^^^^^^^^^^^^^^^^ the trait `core::future::future::Future` is not implemented for `dyn futures::future::Future<Item = (tokio_tungstenite::WebSocketStream<tokio_tungstenite::stream::Stream<tokio_tcp::stream::TcpStream, tokio_tls::TlsStream<tokio_tcp::stream::TcpStream>>>, tungstenite::handshake::client::Response), Error = tungstenite::error::Error> + std::marker::Send`
|
= note: required because of the requirements on the impl of `core::future::future::Future` for `std::boxed::Box<dyn futures::future::Future<Item = (tokio_tungstenite::WebSocketStream<tokio_tungstenite::stream::Stream<tokio_tcp::stream::TcpStream, tokio_tls::TlsStream<tokio_tcp::stream::TcpStream>>>, tungstenite::handshake::client::Response), Error = tungstenite::error::Error> + std::marker::Send>`
error: aborting due to 9 previous errors
I have some tokio-tungstenite
code that's using the latest commit on master and it works beautifully on my Macbook and my CI for Windows passes just fine. The problem is that it seems like the reader never receives any messages!
My code for repro can be found here and it's loosely based on the pattern described in examples/client.rs
.
I understand that I'm using the latest commit on master and I'm not expecting this to be stable but wanted to get it on the radar :) Please let me know if there's anything else I can do to help with this!
Looking at #6 it looks like pings should be already supported and responded to when using tungstenite as a client.
Is that correct?
I wrote a ws client and I see my server sending a Ping but never getting a response from the client.
Is there any way I can confirm pings are being replied to?
Is there a "connect and wait forever" sample client I can look at to check what am I doing wrong?
Issue is most likely on my side; if you have any suggestions on how to troubleshoot I'd appreciate.
Hello!
I'm trying to implement checks for a token, that was specified in headers, but before doing an actual work I'd like to send a request to an existing Redis instance via an additional future (with redis-async crate at this case). The prepared future is invoking inside of the auth_middleware_callback
callback (which is ... nested and leads to potential "callback hell"?). But stuck with two certain things:
So, actual code looks like this:
let server = socket.incoming().for_each(|(stream, addr)| {
let engine_inner = self.engine.clone();
let connections_inner = self.connections.clone();
let handle_inner = handle.clone();
let auth_middleware_callback = |request: &Request| {
let handle_inner = handle.clone();
let auth_middleware_inner = self.auth_middleware.clone();
// Inside of this middleware we're checking and validating a specified token
let processing_result = auth_middleware_inner.borrow().process_request(request, &handle_inner);
match processing_result {
Ok(headers) => Ok(headers),
Err(err) => {
let formatted_error = format!("{}", err);
let message = Cow::from(formatted_error);
Err(TungsteniteError::Protocol(message))
}
}
};
accept_hdr_async(stream, auth_middleware_callback)
// Process the messages
.and_then(move |ws_stream| {
// other code here...
The piece of work, that polling the Redis instance (yes, I know that is bad approach to get a value from the finished future (mostly it's for debugging proposes)):
fn get_user_id(&self, raw_token: String, handle: &Handle) -> Result<String> {
let redis_socket_address = self.redis_address.parse().unwrap();
let redis_connection = paired_connect(&redis_socket_address, handle);
// Make the authentication before, if a password was specified.
let get_user_id_future = redis_connection.and_then(move |connection| {
// we will get here Ok("some user ID") if everything is good
connection.send::<String>(resp_array!["GET", raw_token])
});
let mut processing_result;
handle.spawn(get_user_id_future.then(move |response| {
processing_result = match response {
Ok(user_id) => Ok(String::from(user_id)),
Err(_) => {
let message = String::from("Token is expired or doesn't exist.");
Err(PathfinderError::AuthenticationError(message))
}
};
Ok(())
}));
processing_result
}
So, I have a couple of questions around this stuff:
let auth_middleware_callback = |request: &Request| {
let handle_inner = handle.clone();
let auth_middleware_inner = self.auth_middleware.clone();
let redis_future = auth_middleware_inner.borrow().process_request(request, &handle_inner);
redis_future
};
accept_hdr_async(stream, auth_middleware_callback)
// extract a value from redis_future
.and_then((ws_stream, redis_future) {
// ...
})
// Process the messages
.and_then(move |ws_stream| {
Or in this case I should to go an another way (may be use specify token in the message, and process in inside or accept_async
)?
accept_async
, then can I return in the same time ws_stream and redis_future simultaneously? Or it possible so solve this issue somehow differently?In server mode, is it possible to get the URL of the request? I'm interested in the path part, like "/something".
I need to send along some authentication headers along with a request to instantiate a websocket connection, is there a way I can do this?
When I try https://github.com/snapview/tungstenite-rs/blob/master/examples/client.rs, I got data from the server but not when I try https://github.com/snapview/tokio-tungstenite/blob/master/examples/client.rs using the same URL.
I don't see anything after the handshake.
addr: V4(127.0.0.1:3012)
WebSocket handshake has been successfully completed
Linux x86_64
rustc 1.17.0-nightly (7846dbe0c 2017-03-26)
tokio-tungstenite 0.1.0
tungstenite 0.1.1
I found out the hard way that accept_async
is not only returning a future that handles the handshake, but it immediately accesses the async i/o object to start the handshake and returns a future to complete it.
This means that accept_async
must be called from a task context, which I think isn't obvious and maybe should be stated more clearly.
In a pure futures 0.1 application, this most probably doesn't matter since accept_async
is called when the accepted TcpStream is available, which pretty much always happens inside a task. However if you're using futures 0.3, you need to wrap a future 0.1 in a compatibility wrapper to use it with 0.3 and a current task is only provided to the future while polling it. I found out the hard way that this currently doesn't work well with tokio-tungstenite (see rust-lang/futures-rs#1463).
I'd like to propose to change the accept functions to only return a future and do the actual handshake when the future gets polled. That also seems to be the way everybody else handles it (async fns, futures-rs functions and methods, any other crate I can remember) and is probably the most expected way and would make it work smoothly with futures 0.3 compatibility wrappers as far as I can see.
I know that there's been big changes since Tokio moving point releases, but thought I'd point out the obvious that examples/client.rs
no longer compiles:
error[E0432]: unresolved import `futures::sync`
use futures::sync::mpsc;
error[E0425]: cannot find function `run` in module `tokio::runtime`
tokio::runtime::run(client.map_err(|_e| ()));
error: the `and_then` method cannot be invoked on a trait object
let client = connect_async(url).and_then(move |(ws_stream, _)| {
I'm not sure if the problem is with tokio-tungstenite or with tokio-timer.
To reproduce, run examples/server.rs
with an extra extern crate tokio_timer;
.
result:
> cargo build
Compiling test-tung-server v0.1.0 (file:///home/bbigras/dev/rust/tests/test-tung-server)
error[E0283]: type annotations required: cannot resolve `(): std::convert::From<_>`
--> src/main.rs:77:36
|
77 | let ws_writer = rx.fold(sink, |mut sink, msg| {
| ^^^^
error: aborting due to previous error
error: Could not compile `test-tung-server`.
To learn more, run the command again with --verbose.
rustc 1.17.0-nightly (7846dbe0c 2017-03-26)
futures 0.1.12
tokio-core 0.1.6
tokio-timer 0.1.1
tokio-tungstenite 0.1.2
tungstenite 0.2.1
First Blood!!
Need to update this crate with the new tokio-io library.
Here are teh warnings I'm getting,
warning: use of deprecated item: moved to the `tokio-io` crate
--> src/lib.rs:31:5
|
31 | use tokio_core::io::Io;
| ^^^^^^^^^^^^^^^^^^
|
= note: #[warn(deprecated)] on by default
warning: use of deprecated item: moved to the `tokio-io` crate
--> src/lib.rs:53:24
|
53 | pub fn client_async<S: Io>(url: Url, stream: S) -> ConnectAsync<S> {
| ^^
|
= note: #[warn(deprecated)] on by default
warning: use of deprecated item: moved to the `tokio-io` crate
--> src/lib.rs:72:24
|
72 | pub fn accept_async<S: Io>(stream: S) -> AcceptAsync<S> {
| ^^
|
= note: #[warn(deprecated)] on by default
warning: use of deprecated item: moved to the `tokio-io` crate
--> src/lib.rs:93:48
|
93 | impl<T> Stream for WebSocketStream<T> where T: Io {
| ^^
|
= note: #[warn(deprecated)] on by default
warning: use of deprecated item: moved to the `tokio-io` crate
--> src/lib.rs:102:46
|
102 | impl<T> Sink for WebSocketStream<T> where T: Io {
| ^^
|
= note: #[warn(deprecated)] on by default
warning: use of deprecated item: moved to the `tokio-io` crate
--> src/lib.rs:122:9
|
122 | impl<S: Io> Future for ConnectAsync<S> {
| ^^
|
= note: #[warn(deprecated)] on by default
warning: use of deprecated item: moved to the `tokio-io` crate
--> src/lib.rs:137:9
|
137 | impl<S: Io> Future for AcceptAsync<S> {
| ^^
|
= note: #[warn(deprecated)] on by default
warning: use of deprecated item: moved to the `tokio-io` crate
--> src/lib.rs:150:9
|
150 | impl<S: Io, R: HandshakeRole> Future for MidHandshake<S, R> {
| ^^
|
= note: #[warn(deprecated)] on by default
Finished dev [unoptimized + debuginfo] target(s) in 19.26 secs
Seems to have been somewhat discussed in #4.
close
being async, it seems to me that it should only resolve once the remote has confirmed the close. This allows client code to await the close before dropping underlying network connection in order to close down properly.
In an async-await version this would imply storing the waker and waking up a task waiting for the close to complete.
Like as mentioned in the issue about headers, I'd decided to move a part of callback code to the future and using it later. But I'm getting errors, when trying to assign a custom lifetime to a code, where client headers is going to outlive futures, so that I could use it inside of futures (like, to check a token inside of the main processing part). After the moment when the client will end sending requests (or the connecting will be closed by the server by some certain reason), header should deleted.
Code:
pub fn run<'a: 'b, 'b>(&self, address: SocketAddr) {
let mut core = Core::new().unwrap();
let handle = core.handle();
let socket = TcpListener::bind(&address, &handle).unwrap();
println!("Listening on: {}", address);
let server = socket.incoming().for_each(|(stream, addr)| {
let engine_inner = self.engine.clone();
let connections_inner = self.connections.clone();
let auth_middleware_inner = self.auth_middleware.clone();
let handle_inner = handle.clone();
let headers: &'a Headers = &HashMap::new();
let copy_headers_callback = |request: &'b Request| {
// Just copy the headers as is
for &(ref name, ref value) in request.headers.iter() {
headers[&name.clone()] = value.clone()
}
Ok(None)
};
accept_hdr_async<'b>(stream, copy_headers_callback)
// Process the messages
.and_then(move |ws_stream| {
// Create a channel for the stream, which other sockets will use to
// send us messages. It could be used for broadcasting your data to
// another users in the future.
let (tx, rx) = mpsc::unbounded();
connections_inner.borrow_mut().insert(addr, tx);
// Split the WebSocket stream so that it will be possible to work
// with the reading and writing halves separately.
let (sink, stream) = ws_stream.split();
// Check headers, if we're working in a secured mode
let auth_future = auth_middleware_inner.borrow().process_request(headers, &handle_inner);
....
});
// Run the server
core.run(server).unwrap();
}
Errors:
error[E0277]: the trait bound `(): futures::Future` is not satisfied
--> src/proxy.rs:51:40
|
51 | let server = socket.incoming().for_each(|(stream, addr)| {
| ^^^^^^^^ the trait `futures::Future` is not implemented for `()`
|
= note: required because of the requirements on the impl of `futures::IntoFuture` for `()`
error[E0277]: the trait bound `(): futures::Future` is not satisfied
--> src/proxy.rs:119:14
|
119 | core.run(server).unwrap();
| ^^^ the trait `futures::Future` is not implemented for `()`
|
= note: required because of the requirements on the impl of `futures::IntoFuture` for `()`
= note: required because of the requirements on the impl of `futures::Future` for `futures::stream::ForEach<tokio_core::net::Incoming, [closure@src/proxy.rs:51:49: 116:10 self:_, handle:_], ()>`
Any ideas how it could be fixed?
code to reproduce:
use tokio_tungstenite::connect_async;
#[tokio::main]
async fn main() {
let url = url::Url::parse(&"wss://www.deribit.com/ws/api/v2").unwrap();
connect_async(url).await.expect("Failed to connect");
}
Compile with tls
the feature.
Full error
thread 'main' panicked at 'Failed to connect: Tls(Ssl(Error { code: ErrorCode(1), cause: Some(Ssl(ErrorStack([Error { code: 336031996, library: "SSL routines", function: "SSL23_GET_SERVER_HELLO", reason: "unknown protocol", file: "s23_clnt.c", line: 794 }]))) }, X509VerifyResult { code: 0, error: "ok" }))', src/libcore/result.rs:1188:5
It would be nice to be able to build without TLS. Tungstenite already supports it.
Can you add an example for tls server pls?
Hello,
When using Tungstenite Tokio version, the connection gets stuck. Based on a quick look to gdb backtrace, it may be a stuck in the receive side during connection handling. The main thread is blocked after this. Let me know if I can do anything to help to debug this.
0x00007ffff711187f in __libc_recv (fd=17, buf=0x7ffff6663000, n=4096, flags=0) at ../sysdeps/unix/sysv/linux/x86_64/recv.c:28
28 ../sysdeps/unix/sysv/linux/x86_64/recv.c: No such file or directory.
(gdb) info threads
Id Target Id Frame
* 1 Thread 0x7ffff7fe9840 (LWP 30558) "mles-client" 0x00007ffff711187f in __libc_recv (fd=17, buf=0x7ffff6663000, n=4096, flags=0) at ../sysdeps/unix/sysv/linux/x86_64/recv.c:28
2 Thread 0x7ffff65ff700 (LWP 30562) "mles-client" 0x00007ffff6c28e23 in epoll_wait () at ../sysdeps/unix/syscall-template.S:84
3 Thread 0x7ffff63fe700 (LWP 30563) "mles-client" 0x00007ffff6c28e23 in epoll_wait () at ../sysdeps/unix/syscall-template.S:84
(gdb) bt
#0 0x00007ffff711187f in __libc_recv (fd=17, buf=0x7ffff6663000, n=4096, flags=0) at ../sysdeps/unix/sysv/linux/x86_64/recv.c:28
#1 0x00005555555ba7f2 in std::sys::imp::net::{{impl}}::recv_with_flags () at /checkout/src/libstd/sys/unix/net.rs:161
#2 std::sys::imp::net::{{impl}}::read () at /checkout/src/libstd/sys/unix/net.rs:170
#3 std::sys_common::net::{{impl}}::read () at /checkout/src/libstd/sys_common/net.rs:230
#4 std::net::tcp::{{impl}}::read () at /checkout/src/libstd/net/tcp.rs:459
#5 0x00005555555a9197 in _$LT$mio..net..tcp..TcpStream$u20$as$u20$std..io..Read$GT$::read::hd22429470efbac49 ()
#6 0x000055555559d75a in _$LT$tokio_core..net..tcp..TcpStream$u20$as$u20$std..io..Read$GT$::read::he4646848c964c4b7 ()
#7 0x0000555555563c9c in tungstenite::input_buffer::InputBuffer::read_from::hdce1e509ab469976 ()
#8 0x000055555556e119 in _$LT$tungstenite..handshake..MidHandshake$LT$Stream$C$$u20$Role$GT$$GT$::handshake::h117e18082a560944 ()
#9 0x000055555555fb26 in _$LT$futures..stream..for_each..ForEach$LT$S$C$$u20$F$C$$u20$U$GT$$u20$as$u20$futures..future..Future$GT$::poll::hce925305841b9174 ()
#10 0x000055555557cd8a in mles_client::main::h6272790849884ec7 ()
#11 0x00005555555c38c6 in std::panicking::try::do_call<fn(),()> () at /checkout/src/libstd/panicking.rs:454
#12 0x00005555555ca7eb in panic_unwind::__rust_maybe_catch_panic () at /checkout/src/libpanic_unwind/lib.rs:98
#13 0x00005555555c4117 in std::panicking::try<(),fn()> () at /checkout/src/libstd/panicking.rs:433
#14 std::panic::catch_unwind<fn(),()> () at /checkout/src/libstd/panic.rs:361
#15 std::rt::lang_start () at /checkout/src/libstd/rt.rs:57
#16 0x00007ffff6b42830 in __libc_start_main (main=0x5555555821b0 <main>, argc=3, argv=0x7fffffffdfd8, init=<optimized out>, fini=<optimized out>, rtld_fini=<optimized out>,
stack_end=0x7fffffffdfc8) at ../csu/libc-start.c:291
#17 0x000055555555f5c9 in _start ()
Ref jq-rs/mles-rs#80
Something new for previously working code?
error[E0277]: the trait bound `tungstenite::handshake::client::Request<'static>: std::convert::From<url::Url>` is not satisfied
--> src\exchanges\connection.rs:24:30
|
24 | let (ws, response) = tokio_tungstenite::connect_async(url).compat().await?;
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ the trait `std::convert::From<url::Url>` is not implemented for `tungstenite::handshake::client::Request<'static>`
|
= help: the following implementations were found:
<tungstenite::handshake::client::Request<'static> as std::convert::From<url::Url>>
= note: required because of the requirements on the impl of `std::convert::Into<tungstenite::handshake::client::Request<'static>>` for `url::Url`
= note: required by `tokio_tungstenite::connect::connect_async`
I would like to do something different for each received messages depending of the value of the path.
I can get the path using accept_hdr_async() with a callback but it seems only usable to deny the connection or to add extra headers.
First, thanks for the sweet crate!
start_send()
just accepts whatever message you get it and always asks for more! If the messages are coming in faster than Tungstenite can get them out, the underlying WebSocket::send_queue
just fills up forever.
Besides the memory usage problem, in my server I want to deal with clients that can't take the bandwidth, either by kicking them or by dropping some packets. The wrapper Sink
I made to accomplish this is especially hacky to work around the fact that start_send()
will never refuse a message.
Perhaps there should be a configurable max send_queue
len? I can make a PR if you're into that.
Hi, i am writing a server where i need websocket connections. I have a whole server written in hyper
. How can i add tokio-tungstenite
to the hyper server ?
Would be good if you could show a simple example of that as i am still a newbie in Rust :)
Thank you.
I'm having a hard time handling new connections. When a new stream connects I want to send messages directly back and forth between that client to establish username/etc. However, the examples aren't very clear to me on how to create a handle_connection function.
I noticed in tungstenite-rs the autobahn server example does though. Is it similar to that? I've tried sending using the sink but I can't because ws_writer uses it.
Any advice would be great.
Hello!
I have a couple of questions about using tokio-tungstenite crate in a project, when user can send a multiple requests to a WebSocket server which is working with Kafka and returns some result to the caller:
Hi guys i maybe have a noob question, but i need help. I can manage to open a new connection but cant hold it infinitely it drops after some time. Here is a code:
let task_orderbook = connect_async(url.clone()).and_then(|(ws, _response)| {
println!("Orderbook stream successfully established!");
let subscription = "{\"event\":\"subscribe\",\"channel\":\"book\",\"prec\":\"R0\",\"symbol\":\"tBTCUSD\"}";
let (sink, stream) = ws.split();
sink.send(subscription.into()).wait().ok();
let result = stream.for_each(move |message| { stdout.write_all(&message.into_data()).unwrap(); stdout.flush().unwrap(); Ok(()) });
result
.map(|x|{println!("{:?}",x);()})
//.select(sink.send(subscription.into()).map_err(|_|()))
.then(|_|Ok(()))
}).map_err(|e| {
println!("Error during the websocket handshake occurred: {}", e);
io::Error::new(io::ErrorKind::Other, e)
});
tokio::run(task_orderbook);
I was surprised to see a new native thread being spawn for each new connection established by async_connect
. What's the purpose?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.