Giter Club home page Giter Club logo

Comments (10)

daniel-abramov avatar daniel-abramov commented on July 16, 2024
  1. At the moment we print only the highlevel information when the error occurs, like "TLS Error", "Connection closed", there are different types of errors which can occur. When the connection is closed, there are plenty optional error codes in WebSocket RFC. In any case it's very unlikely that you want to show the error message from the library to the user directly. But if you want to implement some custom message or custom behavior depending on the error code, you can check the close frame documentation, the list of codes (1001 etc) can also be found in documentation. For instance 1001 corresponds to Away.
  2. As you like, if the JSON parsing in your code has failed, it's sensible to handle the error in place, unless the error is critical for your particular task and you want to raise some WebSocket error. But beware, you cannot handle such errors in the or_else block which corresponds to the open connection, as at the point where you received an error message (Connection closed for instance), the connection is already closed.

from tokio-tungstenite.

Relrin avatar Relrin commented on July 16, 2024

So, as far as I understood:

  1. If necessary to handle errors (like closing the connection by a client) on the server side and don't provide a detail information about it – use or_else / mar_err blocks for it.
  2. For handling errors for specific logic, like validating a message before using it further, handle it in place (perhaps macro?) where is necessary to return an appropriate response with the help of sink.start_send(...) method.

That's correct?

from tokio-tungstenite.

daniel-abramov avatar daniel-abramov commented on July 16, 2024
  1. Depending on your error handling strategy. But if the error comes from one of the tokio-tungstenite components, the convenient way would be to handle it in or_else or map_err, yes. Keep in mind, you are not obliged to use the tokio-tungstenite the way it's described, you may write your own future and poll tokio-tungstenite futures from your code if you have a more complex logic. Use what fits better for your needs (for most usecases like request/response architectures such usage with combinators is very common and convenient, but for more complex workflow you can use tokio futures directly).
  2. If the error occurs in your own code (parsing the message format / doing some stuff), you may want to handle it as you like and then queue one or more response messages.

If I were you, I would not use sink directly, I would create an mpsc channel from futures-rs crate and used it as network sink (you can also find this in server example). Something like that:

accept_async(stream).and_then(move |ws_stream| {
    let (tx, rx) = futurse::sync::mpsc::unbounded();
    let (sink, stream) = ws_stream.split();

    let sender = Sender::new(sink);
    let engine = ServerEngine::new(sink);

    let ws_reader = stream.for_each(move |message| {
        engine.on_message(message)
    });

    let ws_writer = rx.fold(sink, |mut sink, msg| {
        use futures::Sink;
        sink.start_send(msg).unwrap();
        Ok(sink)
    });

    ws_reader.map(|_| ()).map_err(|_| ())
             .select(ws_writer.map(|_| ()).map_err(|_| ()));
})

...

struct Sender {
    sink: futures::sunc::mpsc::UnboundedSender<Message>
}

impl Sender {
    fn send(&mut self, message: String) { // or Vec<u8> or whatever you want
        self.sink.unbounded_send(Message::text(message));
    }
}

struct ServerEngine { 
    sender: Sender,
    ...
}

impl ServerEngine {
    fn on_message(message: message:: tungstenite::Messgae) -> Result<(), &'static str> {
        match some_parsing_stuff(message) {
            Ok(parsed) => self.sender.send("Got your message, thanks"),
            Err(parse_error) => self.sender.send("You sent a wrong message to the server"),
            Err(super_critical_error) => return Err("We have to close the server, some very bad error"),
        }
    }
}

from tokio-tungstenite.

Relrin avatar Relrin commented on July 16, 2024

It seems reasonably for me. So, I've tried to change my code to this (just to understand this implementation way and testing purposes), just removed some piece of code an made it as echo server:

use std::io::{Error, ErrorKind};
use std::net::SocketAddr;
use std::result::{Result as BaseResult};

use super::router::{Router};

use futures;
use futures::{Future, Sink};
use futures::stream::{Stream};
use json::{parse as parse_json, JsonValue};
use tokio_core::net::{TcpListener};
use tokio_core::reactor::{Core};
use tokio_tungstenite::{accept_async};
use tungstenite::protocol::{Message};


pub struct Proxy {
    router: Box<Router>,
}


impl Proxy {
    pub fn new(router: Box<Router>) -> Proxy {
        Proxy {
            router: router
        }
    }

    pub fn run(&self, address: SocketAddr) {
        let mut core = Core::new().unwrap();
        let handle = core.handle();
        let socket = TcpListener::bind(&address, &handle).unwrap();
        println!("Listening on: {}", address);

        // The server loop.
        let server = socket.incoming().for_each(|(stream, _addr)| {
            let proxy_inner = self;

            // Handler per each connection.
            accept_async(stream)
                .map_err(|err| {
                    println!("An error occurred during the WebSocket handshake: {}", err);
                    Error::new(ErrorKind::Other, err)
                })
                // Check the Auth header
                .and_then(move |ws_stream| {
                    println!("Checking the user's token");
                    Ok(ws_stream)
                })
                // Process the messages
                .and_then(move |ws_stream| {
                    let (tx, rx) = futures::sync::mpsc::unbounded();
                    let (sink, stream) = ws_stream.split();

                    let sender = Sender::new(tx);
                    let mut engine = ServerEngine::new(sender);

                    let ws_reader = stream.for_each(move |message: Message| {
                        engine.on_message(message);
                        Ok(())
                    });

                    let ws_writer = rx.fold(sink, |mut sink, msg| {
                        sink.start_send(msg).unwrap();
                        Ok(sink)
                    });

                    ws_reader.map(|_| ()).map_err(|_| ())
                             .select(ws_writer.map(|_| ()).map_err(|_| ()));

                    Ok(())
                }).or_else(|err| {
                    println!("An error occurred with the WebSocket connection: {}", err);
                    Ok(())
                })
        });

        // Run the server
        core.run(server).unwrap();
    }

    fn decode_message(&self, message: &str) -> BaseResult<Box<JsonValue>, Error> {
        match parse_json(message) {
            Ok(message) => Ok(Box::new(message)),
            Err(err) => Err(Error::new(ErrorKind::InvalidData, err))
        }
    }
}


struct Sender {
    sink: futures::sync::mpsc::UnboundedSender<Message>
}


impl Sender {
    pub fn new(sink: futures::sync::mpsc::UnboundedSender<Message>) -> Sender {
        Sender {sink: sink}
    }

    pub fn send(&mut self, message: Message) {
        self.sink.unbounded_send(message);
    }
}


struct ServerEngine {
    sender: Sender
}


impl ServerEngine {
    pub fn new(sender: Sender) -> ServerEngine {
        ServerEngine {sender: sender}
    }

    pub fn on_message(&mut self, message: Message) -> Result<(), &'static str> {
        self.sender.send(message);
        Ok(())
    }
}

It's compiling, but don't let me to send a message to a client, because the connection is closed right after it established. Could you help with the find the "a right way" to prolong the connection as long as possible, without closing it (it the way, like I mentioned in the first message of this topic)?

from tokio-tungstenite.

daniel-abramov avatar daniel-abramov commented on July 16, 2024
                   ws_reader.map(|_| ()).map_err(|_| ())
                             .select(ws_writer.map(|_| ()).map_err(|_| ()));

                    Ok(())

You again dropped the future which combines reader and writer and returned Ok(()), that's why it closes the connection (the ws_reader + ws_writer futures are not being executed).

from tokio-tungstenite.

agalakhov avatar agalakhov commented on July 16, 2024

You're doing the same your favorite mistake again. Now you dropped an important future variable before polling it. Remember, future is not a thread! In the variable you store the code that isn't executed yet and not the execution result! You dropped a future without including it into your job, so no surprise that it isn't called. It is like declaring a function without calling it.

from tokio-tungstenite.

daniel-abramov avatar daniel-abramov commented on July 16, 2024

Just out of curiosity: why don't you want to take the server example and just replace the for_each content by your engine code? It would do exactly what you want, isn't it so?

from tokio-tungstenite.

Relrin avatar Relrin commented on July 16, 2024

The reason why I'm doing it's get an experience with the futures features, with which I haven't worked before. So, for me is a new way to make asynchronous code and it will be better if sometimes literally "shooting in the leg" for get an understand how to deal with it and where necessary to take my attention. Before Rust and Tokio-rs I'd played a lot with Python and asyncio/twisted which are working with coroutines/deferreds and chaining them together.
Hereby, examples clearly demonstrating how it could be looks like if you're doing async stuff, just for acquaintance.

from tokio-tungstenite.

agalakhov avatar agalakhov commented on July 16, 2024

Please refer to the documentation and examples of Tokio itself. Tungstenite is just a special case.
The general idea of Tokio is that you construct an object that does the job (future) first and execute it later. The execution happens in the run() function. You must make sure that all parts of your code reach run(). The code is contained in return values of functions you call. I.e., if you do ws_reader.map(...).select(...).map(...), the return value is the actual code, and the code won't be executed unless the return value reaches run().

from tokio-tungstenite.

Relrin avatar Relrin commented on July 16, 2024

Ok, thank you for the detailed answers!

from tokio-tungstenite.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.