Comments (10)
- At the moment we print only the highlevel information when the error occurs, like "TLS Error", "Connection closed", there are different types of errors which can occur. When the connection is closed, there are plenty optional error codes in WebSocket RFC. In any case it's very unlikely that you want to show the error message from the library to the user directly. But if you want to implement some custom message or custom behavior depending on the error code, you can check the close frame documentation, the list of codes (1001 etc) can also be found in documentation. For instance 1001 corresponds to
Away
. - As you like, if the JSON parsing in your code has failed, it's sensible to handle the error in place, unless the error is critical for your particular task and you want to raise some WebSocket error. But beware, you cannot handle such errors in the
or_else
block which corresponds to the open connection, as at the point where you received an error message (Connection closed for instance), the connection is already closed.
from tokio-tungstenite.
So, as far as I understood:
- If necessary to handle errors (like closing the connection by a client) on the server side and don't provide a detail information about it – use
or_else
/mar_err
blocks for it. - For handling errors for specific logic, like validating a message before using it further, handle it in place (perhaps macro?) where is necessary to return an appropriate response with the help of
sink.start_send(...)
method.
That's correct?
from tokio-tungstenite.
- Depending on your error handling strategy. But if the error comes from one of the tokio-tungstenite components, the convenient way would be to handle it in
or_else
ormap_err
, yes. Keep in mind, you are not obliged to use the tokio-tungstenite the way it's described, you may write your own future and poll tokio-tungstenite futures from your code if you have a more complex logic. Use what fits better for your needs (for most usecases like request/response architectures such usage with combinators is very common and convenient, but for more complex workflow you can use tokio futures directly). - If the error occurs in your own code (parsing the message format / doing some stuff), you may want to handle it as you like and then queue one or more response messages.
If I were you, I would not use sink
directly, I would create an mpsc channel from futures-rs
crate and used it as network sink (you can also find this in server example). Something like that:
accept_async(stream).and_then(move |ws_stream| {
let (tx, rx) = futurse::sync::mpsc::unbounded();
let (sink, stream) = ws_stream.split();
let sender = Sender::new(sink);
let engine = ServerEngine::new(sink);
let ws_reader = stream.for_each(move |message| {
engine.on_message(message)
});
let ws_writer = rx.fold(sink, |mut sink, msg| {
use futures::Sink;
sink.start_send(msg).unwrap();
Ok(sink)
});
ws_reader.map(|_| ()).map_err(|_| ())
.select(ws_writer.map(|_| ()).map_err(|_| ()));
})
...
struct Sender {
sink: futures::sunc::mpsc::UnboundedSender<Message>
}
impl Sender {
fn send(&mut self, message: String) { // or Vec<u8> or whatever you want
self.sink.unbounded_send(Message::text(message));
}
}
struct ServerEngine {
sender: Sender,
...
}
impl ServerEngine {
fn on_message(message: message:: tungstenite::Messgae) -> Result<(), &'static str> {
match some_parsing_stuff(message) {
Ok(parsed) => self.sender.send("Got your message, thanks"),
Err(parse_error) => self.sender.send("You sent a wrong message to the server"),
Err(super_critical_error) => return Err("We have to close the server, some very bad error"),
}
}
}
from tokio-tungstenite.
It seems reasonably for me. So, I've tried to change my code to this (just to understand this implementation way and testing purposes), just removed some piece of code an made it as echo server:
use std::io::{Error, ErrorKind};
use std::net::SocketAddr;
use std::result::{Result as BaseResult};
use super::router::{Router};
use futures;
use futures::{Future, Sink};
use futures::stream::{Stream};
use json::{parse as parse_json, JsonValue};
use tokio_core::net::{TcpListener};
use tokio_core::reactor::{Core};
use tokio_tungstenite::{accept_async};
use tungstenite::protocol::{Message};
pub struct Proxy {
router: Box<Router>,
}
impl Proxy {
pub fn new(router: Box<Router>) -> Proxy {
Proxy {
router: router
}
}
pub fn run(&self, address: SocketAddr) {
let mut core = Core::new().unwrap();
let handle = core.handle();
let socket = TcpListener::bind(&address, &handle).unwrap();
println!("Listening on: {}", address);
// The server loop.
let server = socket.incoming().for_each(|(stream, _addr)| {
let proxy_inner = self;
// Handler per each connection.
accept_async(stream)
.map_err(|err| {
println!("An error occurred during the WebSocket handshake: {}", err);
Error::new(ErrorKind::Other, err)
})
// Check the Auth header
.and_then(move |ws_stream| {
println!("Checking the user's token");
Ok(ws_stream)
})
// Process the messages
.and_then(move |ws_stream| {
let (tx, rx) = futures::sync::mpsc::unbounded();
let (sink, stream) = ws_stream.split();
let sender = Sender::new(tx);
let mut engine = ServerEngine::new(sender);
let ws_reader = stream.for_each(move |message: Message| {
engine.on_message(message);
Ok(())
});
let ws_writer = rx.fold(sink, |mut sink, msg| {
sink.start_send(msg).unwrap();
Ok(sink)
});
ws_reader.map(|_| ()).map_err(|_| ())
.select(ws_writer.map(|_| ()).map_err(|_| ()));
Ok(())
}).or_else(|err| {
println!("An error occurred with the WebSocket connection: {}", err);
Ok(())
})
});
// Run the server
core.run(server).unwrap();
}
fn decode_message(&self, message: &str) -> BaseResult<Box<JsonValue>, Error> {
match parse_json(message) {
Ok(message) => Ok(Box::new(message)),
Err(err) => Err(Error::new(ErrorKind::InvalidData, err))
}
}
}
struct Sender {
sink: futures::sync::mpsc::UnboundedSender<Message>
}
impl Sender {
pub fn new(sink: futures::sync::mpsc::UnboundedSender<Message>) -> Sender {
Sender {sink: sink}
}
pub fn send(&mut self, message: Message) {
self.sink.unbounded_send(message);
}
}
struct ServerEngine {
sender: Sender
}
impl ServerEngine {
pub fn new(sender: Sender) -> ServerEngine {
ServerEngine {sender: sender}
}
pub fn on_message(&mut self, message: Message) -> Result<(), &'static str> {
self.sender.send(message);
Ok(())
}
}
It's compiling, but don't let me to send a message to a client, because the connection is closed right after it established. Could you help with the find the "a right way" to prolong the connection as long as possible, without closing it (it the way, like I mentioned in the first message of this topic)?
from tokio-tungstenite.
ws_reader.map(|_| ()).map_err(|_| ())
.select(ws_writer.map(|_| ()).map_err(|_| ()));
Ok(())
You again dropped the future which combines reader and writer and returned Ok(())
, that's why it closes the connection (the ws_reader + ws_writer futures are not being executed).
from tokio-tungstenite.
You're doing the same your favorite mistake again. Now you dropped an important future variable before polling it. Remember, future is not a thread! In the variable you store the code that isn't executed yet and not the execution result! You dropped a future without including it into your job, so no surprise that it isn't called. It is like declaring a function without calling it.
from tokio-tungstenite.
Just out of curiosity: why don't you want to take the server example and just replace the for_each
content by your engine code? It would do exactly what you want, isn't it so?
from tokio-tungstenite.
The reason why I'm doing it's get an experience with the futures features, with which I haven't worked before. So, for me is a new way to make asynchronous code and it will be better if sometimes literally "shooting in the leg" for get an understand how to deal with it and where necessary to take my attention. Before Rust and Tokio-rs I'd played a lot with Python and asyncio/twisted which are working with coroutines/deferreds and chaining them together.
Hereby, examples clearly demonstrating how it could be looks like if you're doing async stuff, just for acquaintance.
from tokio-tungstenite.
Please refer to the documentation and examples of Tokio itself. Tungstenite is just a special case.
The general idea of Tokio is that you construct an object that does the job (future) first and execute it later. The execution happens in the run()
function. You must make sure that all parts of your code reach run()
. The code is contained in return values of functions you call. I.e., if you do ws_reader.map(...).select(...).map(...)
, the return value is the actual code, and the code won't be executed unless the return value reaches run()
.
from tokio-tungstenite.
Ok, thank you for the detailed answers!
from tokio-tungstenite.
Related Issues (20)
- WebSocket protocol error: Connection reset without closing handshake.
- Can't disable certificate validation HOT 1
- I want to configure a larger number of concurrent connections for the server. How should I configure it?
- Consider closing stream if reader was dropped HOT 1
- Using a Tor connector from `arti-client` HOT 1
- Add feature to use rustls instead of native-tls HOT 1
- `TcpListener::accept` blocks runtime after accepting a websocket connection HOT 2
- What's Proper way to close a connection after split? HOT 1
- `tokio-tungstenite` + `SkipServerVerification` + `rustls ` seems never works HOT 2
- SendAfterClosing when doing the close handshake in 0.20.1 HOT 4
- Update to tungstenite 0.21 HOT 3
- `rustls-tls-native-roots` does not imply `rustls-tls-native-roots` for `tungstenite` HOT 3
- Passing body in `connect`? HOT 2
- Can we provided a way to keep the original header name case. HOT 1
- 403 Forbidden HOT 1
- "WebSocket protocol error: httparse error: invalid token" when connect from ngrok tcp tunnel. HOT 5
- Is it possible to bind client to specific interface? HOT 1
- 没有连接断开时,直接检测客户端的状态的api,以实现重新连接,这在其它语言中都是常见的
- Performance boost implementing `poll_write_vectored` and rustls 0.23 HOT 2
- Is MaybeTlsStream supposed to work for servers too? HOT 2
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from tokio-tungstenite.