Giter Club home page Giter Club logo

ntex-mqtt's Introduction

MQTT Client/Server framework

build status codecov crates.io

MQTT Client/Server framework for ntex with support of v5 and v3.1.1 protocols

ntex-mqtt's People

Contributors

aj9411 avatar belltoy avatar ctron avatar dpypin avatar estin avatar fafhrd91 avatar flier avatar massand avatar nastrels avatar nayato avatar pgm-pgm avatar phial3 avatar therealprof avatar vadim-kovalyov avatar viouyang avatar wpbrown avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

ntex-mqtt's Issues

How to use as a client?

I'd like to use actix-mqtt as a MQTT client but I'm struggling a bit getting it to work for anything more than a one-shot message QoS0 PUBLISH message after a CONNECT as exercised by tests/test_server.rs. The problem boils down to the connection automatically being dropped after calling the state function, also the ClientSession which is passed to the ack.state is simply dropped and the rest of the callbacks like finish never even called.

What is the right way to keep the session open and how can the session be used to actually interface with actors?

how to use mpsc::channel to dispatch msg when handle incoming publishes and how to subscribe to multiple topics in one API call?

Kindly help me for two questions:

  1. How to subscribe to multiple topics (all with the sameSubscriptionOptions) in one call?
  2. I want to dispatch the incoming publishes through a mpsc::Sender to different thread to process the msg without blocking the client. However, I can't find a proper way to let fn_service(|control: v5::client::ControlMessage<Error>| use a mpsc::Sender I created. using the client router client.resource("response", publish) has the same challenge.

Appreciate.

Issues with keep-alive after updating from 0.9 to 0.12

Hello ๐Ÿ‘‹๐Ÿป

We've been using this crate for a little while now and thought it was time to update from 0.9 to 0.12. But ever since we did, we noticed our clients are reconnecting every 15 seconds. To make sure the "problem" still exists in the latest version we pulled this repository and used the crate from a local path, but this still gives the same disconnects.

We did some tests and enabled TRACE logging which shows us the following interesting bits:

2023-12-08T12:17:18Z TRACE] Starting mqtt v5 handshake
[2023-12-08T12:17:18Z TRACE] Keep alive 1: 10 // We've added this line to see if the keep-alive of the client was read correctly
[2023-12-08T12:17:18Z INFO ] new connection from client-id=be6b355e-2723-4dc5-814a-ab76e8bbd503
[2023-12-08T12:17:18Z TRACE] Sending: ConnectAck {
        session_present: false,
        reason_code: Success,
        session_expiry_interval_secs: None,
        receive_max: 65535,
        max_qos: AtLeastOnce,
        max_packet_size: None,
        assigned_client_id: None,
        topic_alias_max: 32,
        retain_available: true,
        wildcard_subscription_available: true,
        subscription_identifiers_available: true,
        shared_subscription_available: true,
        server_keepalive_sec: None,
        response_info: None,
        server_reference: None,
        auth_method: None,
        auth_data: None,
        reason_string: None,
        user_properties: [],
    }
[2023-12-08T12:17:18Z TRACE] Keep alive 2: 15 - None // This is again a line we added ourselves for additional debugging
[2023-12-08T12:17:18Z TRACE] Connection handshake succeeded
[2023-12-08T12:17:18Z TRACE] Connection handler is created, starting dispatcher
[2023-12-08T12:17:18Z DEBUG] Start keep-alive timer Seconds(15)

Now after this snippet you see a lot of publish packages coming in (we are testing this with a test client which continuously sends data) and then after 15 seconds we see this part:

[2023-12-08T12:19:29Z TRACE] Keep-alive error, stopping dispatcher
[2023-12-08T12:19:29Z TRACE] Dispatch v5 packet: DispatchItem::KeepAliveTimeout

And in between the publish messages we saw 2 of these:

[2023-12-08T12:19:16Z TRACE] Dispatch v5 packet: DispatchItem::Item((PingRequest, 0))
...
[2023-12-08T12:19:29Z TRACE] Dispatch v5 packet: DispatchItem::Item((PingRequest, 0))

The client we are using to test this with, didn't change after we updated this crate from 0.9 to 0.12 so it's either caused by some (default) config that is changed, updated or introduced in 0.12, or its some kind of bug.

I will continue to debug this one myself as well, but I'm hoping this might sound familiar or sound like something you may have seen before? Any suggestions that might push us in the right direction in order to resolve the disconnects are very much appreciated.

Thanks!
Sander

MqttServer drops QoS 0 messages when the client disconnects

Looks like MqttServer incorrectly handles mqtt messages when client disconnects right after sending publications to the server.
When I use mosquitto_pub to send 1 QoS 0 message and disconnect MqttServer doesn't process incoming message (don't pass it to publish handler). However it handles QoS 1 message properly.

In both cases it doesn't read DISCONNECT message send afterwards by a client.

For QoS 0 messages:

[2021-05-14T21:40:50Z INFO  ntex::server::builder] Starting 1 workers
[2021-05-14T21:40:50Z INFO  ntex::server::builder] Starting "mqtt" service on 0.0.0.0:1883
[2021-05-14T21:40:50Z TRACE ntex::server::accept] Starting server accept loop
[2021-05-14T21:40:50Z TRACE ntex::server::worker] Service "mqtt" is available
[2021-05-14T21:40:56Z TRACE ntex::server::accept] Accepting connection: Tcp(TcpStream { addr: 127.0.0.1:1883, peer: 127.0.0.1:54878, fd: 16 })
[2021-05-14T21:40:56Z TRACE ntex::server::worker] Got socket for service: "mqtt"
[2021-05-14T21:40:56Z TRACE ntex_mqtt::service] Start connection handshake
[2021-05-14T21:40:56Z TRACE ntex_mqtt::v3::server] Starting mqtt handshake
[2021-05-14T21:40:56Z INFO  ntex_broker] Client mosq-GgswNn5xvTJLbAZeJg connected
[2021-05-14T21:40:56Z TRACE ntex_mqtt::v3::server] Sending success handshake ack: ConnectAck {
        session_present: false,
        return_code: ConnectionAccepted,
    }
[2021-05-14T21:40:56Z TRACE ntex_mqtt::service] Connection handshake succeeded
[2021-05-14T21:40:56Z TRACE ntex_mqtt::service] Connection handler is created, starting dispatcher
[2021-05-14T21:40:56Z TRACE ntex::framed::state] io stream is disconnected
[2021-05-14T21:40:56Z TRACE ntex::framed::write] write io is closed
[2021-05-14T21:40:56Z TRACE ntex_mqtt::io] dispatcher is instructed to stop
[2021-05-14T21:40:56Z TRACE ntex_mqtt::io] service shutdown is completed, stop
[2021-05-14T21:40:56Z INFO  ntex_broker] Client mosq-GgswNn5xvTJLbAZeJg closed connection
[2021-05-14T21:40:56Z INFO  ntex_broker] Sink worker for mosq-GgswNn5xvTJLbAZeJg stopped
^C[2021-05-14T21:40:59Z INFO  ntex::server::builder] SIGINT received, exiting
[2021-05-14T21:40:59Z TRACE ntex::server::accept] Stopping socket listener: 0.0.0.0:1883

For QoS 1 messages

2021-05-14T21:41:02Z INFO  ntex::server::builder] Starting 1 workers
[2021-05-14T21:41:02Z INFO  ntex::server::builder] Starting "mqtt" service on 0.0.0.0:1883
[2021-05-14T21:41:02Z TRACE ntex::server::accept] Starting server accept loop
[2021-05-14T21:41:02Z TRACE ntex::server::worker] Service "mqtt" is available
[2021-05-14T21:41:05Z TRACE ntex::server::accept] Accepting connection: Tcp(TcpStream { addr: 127.0.0.1:1883, peer: 127.0.0.1:54880, fd: 16 })
[2021-05-14T21:41:05Z TRACE ntex::server::worker] Got socket for service: "mqtt"
[2021-05-14T21:41:05Z TRACE ntex_mqtt::service] Start connection handshake
[2021-05-14T21:41:05Z TRACE ntex_mqtt::v3::server] Starting mqtt handshake
[2021-05-14T21:41:05Z INFO  ntex_broker] Client mosq-mhyIWwydds6JJIbXCh connected
[2021-05-14T21:41:05Z TRACE ntex_mqtt::v3::server] Sending success handshake ack: ConnectAck {
        session_present: false,
        return_code: ConnectionAccepted,
    }
[2021-05-14T21:41:05Z TRACE ntex_mqtt::service] Connection handshake succeeded
[2021-05-14T21:41:05Z TRACE ntex_mqtt::service] Connection handler is created, starting dispatcher
[2021-05-14T21:41:05Z TRACE ntex_mqtt::v3::dispatcher] Dispatch packet: Publish(
        Publish {
            packet_id: Some(
                1,
            ),
            topic: "topic",
            dup: false,
            retain: false,
            qos: AtLeastOnce,
            payload: "<REDACTED>",
        },
    )
[2021-05-14T21:41:05Z INFO  ntex_broker] PUBLISH Publish { packet_id: Some(1), topic: "topic", dup: false, retain: false, qos: AtLeastOnce, payload: "<REDACTED>" }
[2021-05-14T21:41:05Z TRACE ntex_mqtt::v3::dispatcher] Publish result for packet Some(1) is ready
[2021-05-14T21:41:05Z TRACE ntex::framed::state] io stream is disconnected
[2021-05-14T21:41:05Z TRACE ntex::framed::write] write io is closed
[2021-05-14T21:41:05Z TRACE ntex_mqtt::io] dispatcher is instructed to stop
[2021-05-14T21:41:05Z TRACE ntex_mqtt::io] service shutdown is completed, stop
[2021-05-14T21:41:05Z INFO  ntex_broker] Client mosq-mhyIWwydds6JJIbXCh closed connection
[2021-05-14T21:41:05Z INFO  ntex_broker] Sink worker for mosq-mhyIWwydds6JJIbXCh stopped
^C[2021-05-14T21:41:09Z INFO  ntex::server::builder] SIGINT received, exiting
[2021-05-14T21:41:09Z TRACE ntex::server::accept] Stopping socket listener: 0.0.0.0:1883

mosquitto_pub command output:

$ mosquitto_pub -h localhost -p 1883 -t topic -d -m hello -q 0                                                                                              dmolokan@dmolokan-devbox
Client mosq-GgswNn5xvTJLbAZeJg sending CONNECT
Client mosq-GgswNn5xvTJLbAZeJg received CONNACK (0)
Client mosq-GgswNn5xvTJLbAZeJg sending PUBLISH (d0, q0, r0, m1, 'topic', ... (5 bytes))
Client mosq-GgswNn5xvTJLbAZeJg sending DISCONNECT

$ mosquitto_pub -h localhost -p 1883 -t topic -d -m hello -q 1                                                                                              dmolokan@dmolokan-devbox
Client mosq-mhyIWwydds6JJIbXCh sending CONNECT
Client mosq-mhyIWwydds6JJIbXCh received CONNACK (0)
Client mosq-mhyIWwydds6JJIbXCh sending PUBLISH (d0, q1, r0, m1, 'topic', ... (5 bytes))
Client mosq-mhyIWwydds6JJIbXCh received PUBACK (Mid: 1, RC:0)
Client mosq-mhyIWwydds6JJIbXCh sending DISCONNECT

Send to subscribers.

Any advice will be appreciated. Do I understand correctly that all subscriptions we accept are not served automatically? So on any incoming publish with some routing key we should manually iterate all sessions and their subscriptions (to find matching) to find sessions and sinks to publish this message to? Or there are better mechanics for that?

Allow getting access to the underlying socket

Maybe this is already possible, and I just couldn't find a way.

We would need access to the underlying socket of the connection, at least for handling the "connect" request.

Our use case is that, when handling the "connect" packet, we need the have access to the X.509 client certificates (if any) for validation. Both openssl and rustls provide calls to extract this information from the socket. However, I couldn't find a way to get access to the socket.

I do understand that, depending on the implementation type, the socket could be of a different type. However, others seem to solve this by using a dyn &Any argument, which you can then "downcast", to check if for the concrete type. I guess other ways should also be possible.

If it helps, I would try to create a PR to fix this issue. However, I didn't have any plan yet on how to achieve this.

Possible session leak

I have the feeling there is a session leak in the server code in case the server gets "too many" messages.

This is the scenario:

  • Use the subs.rs example

  • Instead of returning Ok() in the publish method, use:

    tokio::time::sleep(Duration::from_secs(1)).await;
    Err(MyServerError)
  • Implement Drop for MySession, print out something

Now when you send one message, it processes it, fails, closes the connection and prints out the message from drop.

If you send e.g. 20 messages at once, it processes all of the, closes the connection, but doesn't print out the message. This is pretty reproducible.

My assumption is, the session is still lingering somewhere.

Allow using TLS

It looks to me as if TLS currently isn't supported. I think it would be useful to have support for TLS.

mqtt 5.0 session expiration interval

Hello @fafhrd91
Has the "session expiration interval" been processed in the framework.
I did not find the relevant code in the source code.
If it is not processed, I need to support the "session expiration interval", what should I do.
At present, I don't know much about the storage and other processing of the "Session" of the framework. I hope I can tell me the specific location of the relevant code.

tokio::main support

Is there any chance I can use v3::MqttServer from within tokio::spawn?

Like this:

...
#[tokio::main]
async fn main() {
    ...

    let mqtt_server = tokio::spawn(async move {
        v3::MqttServer::new(handshake_v3).publish(publish_v3).run().await
    });
    let _join = tokio::try_join!(mqtt_server).expect("Cannot join spawned futures!");
}

PubAckReason enum contains ReceiveMaximumExceeded

The PubAckReason enum has the value ReceiveMaximumExceeded included with value 147. While this is a valid code for the DISCONNECT packet, my reading of the spec is that it's not a valid value per spec for PUBACK.

Source code:

ReceiveMaximumExceeded = 147,

MQTT 5 specification reference: https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901031

I propose that we should remove this value from the enum, unless there is a compelling reason/dependency on it currently elsewhere that would force it to stay in. I observed this issue when I tried issuing this PUBACK reason and the client I was using (non-ntex-mqtt) was unable to properly parse the result code.

KeepAlive is not working for PingRequest

I'm testing using basic example and having three issues with a client disconnect timeout:

1 - The client got disconnected before send ping request to the server;
2 - If I change the server source code to configure timeout as 1.5 times the value of keepalive, client will continue to disconnecting because the ping request package doesn't refresh the keepalive timer as publish packet does;
3 - When client try to disable keepalive (keepalive = 0), the connection are closed immediataly.

The client only connect to server and subscribe to one topic. If client send publish messages periodicaly, the connection keep alive

SubscribeAckReason enum has a typo

Definitely not a major issue, but I observed that the SubscribeAckReason enum value for SharedSubsriptionNotSupported has Subscription misspelled:

SharedSubsriptionNotSupported = 158,

I'm happy to correct the issue myself, but unsure how you want to approach what would technically be a breaking change for consumers of the library? Obviously this enum value being misspelled is not a functionality issue, but I figured I would open the issue in case cleaning it up is of interest to you.

cargo-audit nanorand 0.5.2

cargo-audit output:

Crate:         nanorand
Version:       0.5.2
Title:         Aliased mutable references from `tls_rand` & `TlsWyRand`
Date:          2021-09-23
ID:            RUSTSEC-2021-0114
URL:           https://rustsec.org/advisories/RUSTSEC-2021-0114
Solution:      Upgrade to >=0.6.1
Dependency tree:
nanorand 0.5.2
โ””โ”€โ”€ ntex 0.4.1
    โ””โ”€โ”€ ntex-mqtt 0.7.2

How to respond to Qos1 subscription in v5

Hello, I am having trouble responding to the subscription.
I imitated the implementation of drogue-cloud/mqtt-integration and wanted to make a Mqtt Broker. https://github.com/drogue-iot/drogue-cloud/blob/main/mqtt-integration/src
Everything looked normal at first.
drogue-cloud/mqtt-integration only implements subscription (Qos 0).
I hope to support Qos 1.
There was a problem when I replaced send_at_most_once() with send_at_least_once()๏ผš

  • The client terminal automatically closes after receiving the subscription information once.

Related information I know so far:

  • send_at_least_once() returns PublishQos1Error::Disconnected
  • Received v5::ControlMessage::ProtocolError in Control

The error is as follows๏ผš

[2021-09-06T11:01:27Z INFO  session] Hello
[2021-09-06T11:01:27Z INFO  ntex::server::builder] Starting 1 workers
[2021-09-06T11:01:27Z INFO  ntex::server::builder] Starting "mqtt" service on 127.0.0.1:1883
[2021-09-06T11:01:34Z INFO  session] new connection: Connect { clean_start: true, keep_alive: 60, session_expiry_interval_secs: None, auth_method: None, auth_data: None, request_problem_info: true, request_response_info: false, receive_max: None, topic_alias_max: 0, user_properties: [], max_packet_size: None, last_will: Some(LastWill { qos: AtLeastOnce, retain: false, topic: "aaaaaaaa", message: b"aaaaaaaaaaaaaaaaaaaaaa", will_delay_interval_sec: Some(5), correlation_data: None, message_expiry_interval: Some(5), content_type: Some("aaaaaaaa"), user_properties: [], is_utf8_payload: Some(true), response_topic: None }), client_id: "mqttx_6dfcaa94", username: Some("admin"), password: Some(b"1234567") }
[2021-09-06T11:01:34Z ERROR session] ProtocolError { err: Decode(MalformedPacket), pkt: Disconnect { reason_code: ImplementationSpecificError, session_expiry_interval_secs: None, server_reference: None, reason_string: None, user_properties: [] } }
[2021-09-06T11:01:34Z ERROR session] Disconnected

This is a simple example of modifying examples/session.rs, which is the same as the error I made in practice.(Other codes are the same as examples/session.rs)๏ผš

async fn control_v5<E>(
    session: v5::Session<MySession>,
    control: v5::ControlMessage<E>,
) -> Result<v5::ControlResult, MyServerError> {
    match control {
        v5::ControlMessage::Auth(a) => {
            // we don't do extended authentication (yet?)
            Ok(a.ack(Auth::default()))
        }
        v5::ControlMessage::Error(e) => Ok(e.ack(DisconnectReasonCode::UnspecifiedError)),
        v5::ControlMessage::ProtocolError(pe) => {
            error!("{:?}", pe);
            Ok(pe.ack())
        }
        v5::ControlMessage::Ping(p) => Ok(p.ack()),
        v5::ControlMessage::Disconnect(d) => Ok(d.ack()),
        v5::ControlMessage::Subscribe(mut s) => {
            s.iter_mut().for_each(|mut s| {
                let sink = session.sink().clone();
                let topic = s.topic().clone();
                ntex::rt::spawn(async move {
                    match sink.publish(topic, Bytes::from("A")).send_at_least_once().await {
                        Ok(w) => {
                            error!("{:?}", w)
                        }
                        Err(e) => {
                            error!("{:?}", e)
                        }
                    };
                });
                s.subscribe(v5::QoS::AtLeastOnce);
            });

            Ok(s.ack())
        }
        v5::ControlMessage::Unsubscribe(u) => Ok(u.ack()),
        v5::ControlMessage::Closed(c) => Ok(c.ack()),
    }
}

How should I respond to the subscription correctly (Qos 1)๏ผŸ

mqtt-ws-server.rs ws connection fail

i run the ws example,and use mqttx to connect with protocol wss,but could not establish the connection.

โžœ ntex-mqtt git:(master) โœ— cargo run --example mqtt-ws-server Compiling ntex-mqtt v0.11.4 (/home/boring/workspace/ntex-mqtt) Finished dev [unoptimized + debuginfo] target(s) in 4.76s Running target/debug/examples/mqtt-ws-server`
[2023-08-25T04:59:52Z INFO ntex::server::builder] Starting 1 workers
[2023-08-25T04:59:52Z INFO ntex::server::builder] Starting "mqtt" service on 0.0.0.0:8883
[2023-08-25T04:59:52Z TRACE ntex::server::accept] Starting server accept loop
[2023-08-25T04:59:52Z INFO ntex::server::accept] Starting socket listener on 0.0.0.0:8883
[2023-08-25T04:59:52Z TRACE ntex::server::worker] Service "mqtt" is available
[2023-08-25T04:59:52Z TRACE ntex::server::accept] Worker is available
[2023-08-25T04:59:59Z TRACE ntex::server::accept] Accepting connection: Tcp(TcpStream { addr: 192.168.2.47:8883, peer: 192.168.2.89:50088, fd: 17 }) bp: false
[2023-08-25T04:59:59Z TRACE ntex::server::accept] Sent to worker 0
[2023-08-25T04:59:59Z TRACE ntex::server::worker] Got socket for service: "mqtt"
[2023-08-25T04:59:59Z TRACE ntex_io::tasks] new 230 bytes available, wakeup dispatcher
[2023-08-25T04:59:59Z TRACE ntex_io::io] waking up io read task
[2023-08-25T04:59:59Z TRACE ntex_io::tasks] new 85 bytes available, wakeup dispatcher
Connection is established, chossing protocol
[2023-08-25T04:59:59Z TRACE ntex_io::io] waking up io read task
[2023-08-25T04:59:59Z TRACE ntex_io::tasks] new 277 bytes available, wakeup dispatcher
[2023-08-25T04:59:59Z TRACE ntex_io::io] waking up io read task
HTTP protocol is selected
[2023-08-25T04:59:59Z TRACE ntex::http::service] New http connection, peer address Some(192.168.2.89:50088)
[2023-08-25T04:59:59Z DEBUG ntex_io::ioref] start keep-alive timeout 3s
[2023-08-25T04:59:59Z TRACE ntex::http::h1::dispatcher] trying to read http message
[2023-08-25T04:59:59Z TRACE ntex::http::h1::dispatcher] http message is received:
Request HTTP/1.1 GET:/
headers:
"sec-websocket-key": "xF6wrL8RlOLLKxti+1r4Hg=="
"connection": "Upgrade"
"sec-websocket-version": "13"
"sec-websocket-extensions": "permessage-deflate; client_max_window_bits"
"upgrade": "websocket"
"sec-websocket-protocol": "mqtt"
"host": "192.168.2.47:8883"
and payload Stream(PayloadDecoder { kind: Cell { value: Eof } })
[2023-08-25T04:59:59Z DEBUG ntex_io::ioref] unregister keep-alive timeout
[2023-08-25T04:59:59Z TRACE ntex::http::h1::dispatcher] prep io for upgrade handler
[2023-08-25T04:59:59Z TRACE ntex::http::h1::dispatcher] switching to upgrade service for
Request HTTP/1.1 GET:/
headers:
"sec-websocket-key": "xF6wrL8RlOLLKxti+1r4Hg=="
"connection": "Upgrade"
"sec-websocket-version": "13"
"sec-websocket-extensions": "permessage-deflate; client_max_window_bits"
"upgrade": "websocket"
"sec-websocket-protocol": "mqtt"
"host": "192.168.2.47:8883"

[2023-08-25T04:59:59Z DEBUG ntex_io::ioref] unregister keep-alive timeout
[2023-08-25T04:59:59Z DEBUG ntex_io::ioref] unregister keep-alive timeout
[2023-08-25T04:59:59Z TRACE ntex_io::io] not enough data to decode next frame
[2023-08-25T04:59:59Z TRACE ntex_tokio::io] tcp stream is disconnected
[2023-08-25T04:59:59Z TRACE ntex_tokio::io] write task is instructed to terminate
[2023-08-25T04:59:59Z DEBUG ntex_io::ioref] unregister keep-alive timeout
[2023-08-25T04:59:59Z TRACE ntex_io::io] io is dropped, force stopping io streams IO_STOPPED | IO_STOPPING | IO_STOPPING_FILTERS | WR_PAUSED | KEEPALIVE
[2023-08-25T04:59:59Z TRACE ntex_io::ioref] force close io stream object
`

not found in `crate::rt`

Create a new project, then reference ntex-mqtt = "0.11.0-beta.3", and report an error when using cargo run
image

How to use Router?

When using v5::Router as the publish service, how can I access the path wildcard matches in the handler?

how to let ntex-rt and actix-rt work together?

I'm working on a use case to translate MQTT message to a WebSocket based message without MQTT Broker. So, my tool has to play two roles: MQTT broker and Websocket client.
My WebSocket client is based on Actix Actor and therefore it needs actix-rt. Glad to see that Actix-MQTT becomes ntex-mqtt now, my questions:
#1) Is it possible to let ntex-rt and actix-rt work together?
#2) if not, is there any way to let both of them running in tokio runtime?

Always thanks to the great work done by. @fafhrd91

example to have secure MQTT connection to port 8883

I'm trying to use this client and connect to Azure IoTHub via MQTT on port 8883 but I can't find an API to load CA root cert. (BaltimoreCyberTrustRoot.crt.pem)

I can use Python Paho lib to valid and have connected to Azure IoTHub via MQTT directly.

Subscribe packet is malformed when a subscription identifier is specified

In subscribe.rs, when a subscription identifier is specified, the subscribe packet is malformed because it is missing a byte. The property length should also account for the property type byte.

https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901092 Section 3.8.2.1.2 "11 (0x0B) Byte, Identifier of the Subscription Identifier. Followed by a Variable Byte Integer representing the identifier of the subscription."

Summary of the problem:
MicrosoftTeams-image (6)

Malformed subscribe packet sent with ntex-mqtt:
MicrosoftTeams-image (3)

Two malformed packets sent with ntex-mqtt where subscription id = 1 and subscription id = 2. This shows that the 0x0b byte indicates subscription id property type and the following byte is the actual subscription id.
MicrosoftTeams-image (5)

Valid subscribe packet sent using MQTTnet. The property length is 02 instead of 01.
MicrosoftTeams-image (4)

bridge support

Hi, would you be interested in adding bridge support?

Troubles upgrade to 0.8.x

Having some troubles with upgrade to 0.8.x:

error[E0432]: unresolved import `rust_tls`
  --> /home/andrey/.cargo/registry/src/github.com-1ecc6299db9ec823/ntex-0.5.6/src/http/client/connector.rs:81:17
   |
81 |             use rust_tls::{OwnedTrustAnchor, RootCertStore};
   |                 ^^^^^^^^ use of undeclared crate or module `rust_tls`

error[E0432]: unresolved import `crate::rt::tcp_connect_in`
 --> /home/andrey/.cargo/registry/src/github.com-1ecc6299db9ec823/ntex-0.5.6/src/connect/service.rs:5:5
  |
5 | use crate::rt::tcp_connect_in;
  |     ^^^^^^^^^^^^^^^^^^^^^^^^^ no `tcp_connect_in` in `rt`

error[E0432]: unresolved import `crate::rt::Signal`
  --> /home/andrey/.cargo/registry/src/github.com-1ecc6299db9ec823/ntex-0.5.6/src/server/builder.rs:10:24
   |
10 | use crate::rt::{spawn, Signal, System};
   |                        ^^^^^^ no `Signal` in `rt`

error[E0432]: unresolved import `crate::rt::tcp_connect`
 --> /home/andrey/.cargo/registry/src/github.com-1ecc6299db9ec823/ntex-0.5.6/src/server/test.rs:6:17
  |
6 | use crate::rt::{tcp_connect, System};
  |                 ^^^^^^^^^^^ no `tcp_connect` in `rt`

error[E0433]: failed to resolve: use of undeclared crate or module `webpki_roots`
  --> /home/andrey/.cargo/registry/src/github.com-1ecc6299db9ec823/ntex-0.5.6/src/http/client/connector.rs:86:17
   |
86 |                 webpki_roots::TLS_SERVER_ROOTS.0.iter().map(|ta| {
   |                 ^^^^^^^^^^^^ use of undeclared crate or module `webpki_roots`

Any additional info I should provide?

unresolved import `crate::rt::Signal`

Upgraded to 1.69 rust and getting the below error when trying to compile router_v5 from examples.

error[E0432]: unresolved import crate::rt::Signal
--> .cargo/registry/src/github.com-1ecc6299db9ec823/ntex-0.6.7/src/server/builder.rs:8:24
|
8 | use crate::rt::{spawn, Signal, System};
| ^^^^^^ no Signal in rt
|
= help: consider importing one of these items instead:
crate::server::ServerCommand::Signal
polling::os::kqueue::Signal

PubACK v5 Decoder expect reasonCode+Properties0

Hello,

on evaluating your library I noticed some weired behavior.
I try to explain what noticed and what expected
in src\v5\codec\packet\pubacks.rs PublishAck::decode
The Decoder expects that reason code and properties are present or absent.
(The same I would interpret the MQTT specification)

I tested with different MQTT Brokers and notices then every server or Client I found
allows ReasonCode != 0 to add, but dont send 0 for propertyLength.
This Results in a remaining of 3. ( id(2) + reason(1) )
The Code expect 2 ( id(2) or 4 ( id(2)+reason(1)+length(1) ).

My Question: is this a Bug or a Feature ?

My Rust Knowledge is to low, so this is only a test to fix :

impl PublishAck {
    pub(crate) fn decode(src: &mut Bytes) -> Result<Self, DecodeError> {
        let packet_id = NonZeroU16::decode(src)?;
        let (reason_code, properties, reason_string) = if src.has_remaining() {
            let reason_code = src.get_u8().try_into()?;
            let (properties, reason_string) = if src.has_remaining() { // accept missing properties
                ack_props::decode(src)?
            } else {
                (UserProperties::default(), None)
            };
            ensure!(!src.has_remaining(), DecodeError::InvalidLength); // no bytes should be left
            (reason_code, properties, reason_string)
        } else {
            (PublishAckReason::Success, UserProperties::default(), None)
        };

        Ok(Self { packet_id, reason_code, properties, reason_string })
    }

Perhaps this helps someone.... I don't know if this is a Bug or not.

Unable to access subscription Id

It looks like there is currently no way to access the subscription ID of a subscribe packet:

pub id: Option<NonZeroU32>,

As the wrapping structure is not exposing the packet:

ntex-mqtt/src/v5/control.rs

Lines 109 to 112 in f463864

pub struct Subscribe {
packet: codec::Subscribe,
result: codec::SubscribeAck,
}

I guess there should be a packet() function for the v5 subscribe structure.

If you like, I can prepare a PR for this.

MQTT 5 support

I just noticed that there's a v5 branch. Is this going to be integrated into master as the official MQTT 5 support or is this just a random side gig.

I'd be very interested in contributing and would like to know the way forward here.

Server responds with "Maximum Packet Size" = 0 by default

By default, the MQTT v5 server responds with a maximum packet size of zero:

image

However, according to the spec, this is not allowed:

It is a Protocol Error to include the Maximum Packet Size more than once, or for the value to be set to zero.

https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901050

I guess this is due to the fact, that the MqttServer is initialized with 0:

max_size: 0,

To my understanding, in the case of "unlimited", it should simply be omitted:

If the Maximum Packet Size is not present, no limit on the packet size is imposed beyond the limitations in the protocol as a result of the remaining length encoding and the protocol header sizes.

v0.8.3 Linux compilation failed

I failed to build in centos7.3 environment with ntex-mqtt v0.8.3 version.
Here is the error:

error[E0599]: no method named `is_init_finished` found for reference `&SslRef` in the current scope
   --> /root/.cargo/registry/src/github.com-1ecc6299db9ec823/ntex-tls-0.1.3/src/openssl/mod.rs:204:54
    |
204 |                         if self.inner.borrow().ssl().is_init_finished() {
    |                                                      ^^^^^^^^^^^^^^^^ method not found in `&SslRef`

For more information about this error, try `rustc --explain E0599`.
error: could not compile `ntex-tls` due to previous error

openssl version is

OpenSSL 1.1.1i  8 Dec 2020

[BUG] Client sink closes after adding a keepalive value

The client sink shuts shown after sending the first keepalive ping. Notice the logs where mqtt client connection is closed.

#[ntex::main]
async fn main() -> std::io::Result<()> {
    std::env::set_var("RUST_LOG", "ntex=trace,ntex_mqtt=trace,client=trace");
    env_logger::init();
    
    // connect to server
    let client = client::MqttConnector::new("127.0.0.1:1883")
        .client_id("user")
        // [BUG] adding a keep a keep alive timeout causes the client to close
        // after first ping
        .keep_alive(1)
        .connect()
        .await
        .unwrap();

    let sink = client.sink();

    let router = client.resource("response", publish_v5);

    ntex::rt::spawn(router.start_default());

    let expire = RtInstant::from_std(
                        Instant::now() + Duration::from_secs(1000));
    delay_until(expire).await;

    sink.close();
    Ok(())
}

Logs:

2021-01-13T00:54:37Z TRACE ntex_codec::framed] attempting to decode a frame
[2021-01-13T00:54:37Z DEBUG ntex_mqtt::v5::client::connection] start mqtt client keep-alive task 1
[2021-01-13T00:54:38Z TRACE ntex_mqtt::v5::client::connection] send ping
[2021-01-13T00:54:38Z TRACE ntex_mqtt::framed] FLUSH and Stop state
[2021-01-13T00:54:38Z TRACE ntex_codec::framed] flushing framed transport
[2021-01-13T00:54:38Z TRACE ntex_mqtt::framed] ready
[2021-01-13T00:54:38Z TRACE ntex_mqtt::framed] Framed flushed, shutdown
[2021-01-13T00:54:38Z TRACE ntex_codec::framed] framed transport flushed and closed
[2021-01-13T00:54:39Z TRACE ntex_mqtt::v5::client::connection] send ping
**[2021-01-13T00:54:39Z DEBUG ntex_mqtt::v5::client::connection] mqtt client connection is closed, stopping keep-alive task**

Whats your plan with this project?

Hi. I'm looking for an mqtt broker. The example looks very promising.

  • Is it ready to use?
  • If not, are you open to help?
  • Are you going to do releases?
  • Documentation?

Support for MQTT over websockets

I would be interested in MQTT over websockets. To my understanding, and I may be wrong here, one would "just" need to encapsulate MQTT packets in WebSocket frames.

From what I saw, ntex has support for WebSockets and brings plain MQTT already. So maybe, there is a simple way to implement this.

How to define return type of `v3::MqttServer::new`

I'm working on a project that is trying to use ntex-mqtt and so far it seems to work out pretty well. So first of all thanks for this amazing framework!!

Now my issue is with defining the function signature of a function that returns the result of v3::MqttServer::new. The function looks like this:

fn new_server_v3(
    <snip>
) -> v3::MqttServer<ClientSession, ?, ?, ?> {
    v3::MqttServer::new(|handshake: v3::Handshake| async {
       // do some prep stuff

        let session = ClientSession {
            <snip>
        };

        Ok::<_, ServerError>(handshake.ack(session, false))
    })
    .control(control_service_factory_v3())
    .publish(fn_factory_with_config(
        |session: v3::Session<ClientSession>| {
            Ready::Ok::<_, ServerError>(fn_service(move |req| publish_v3(session.clone(), req)))
        },
    ))
}

For readability and because this function takes some variables that we need later again to create the v5 server, we really prefer to create this function instead of defining it inline. Can you maybe tell me how to define the type of how I should be able to find that out by myself?

Thanks!!

How to use acknowledged commands?

Trying to use publish_qos1 instead of publish_qos0 doesn't seem to work as intended, if I .await the result the packet is never sent over the wire and the execution just hangs. If I don't .await (and ignore the violent warning from rustc that async functions must be polled to achieve something) and follow uo with a publish_qos0 I can see that both packets are batched up and sent back-to-back in the same TCP packet to the broker.

I was trying to implement SUBSCRIBE support but that suffers from the same problem.

generic associated types are unstable

I'm a NOOB to Rust, but when I try to compile the code under examples router_v5, getting the below error. Would appreciate any help/hints in resolving this

error[E0658]: generic associated types are unstable
--> .cargo/registry/src/github.com-1ecc6299db9ec823/ntex-service-1.0.2/src/and_then.rs:41:5
|
41 | type Future<'f> = AndThenServiceResponse<'f, A, B, Req> where Self: 'f, Req: 'f;
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
= note: see issue #44265 rust-lang/rust#44265 for more information

Where is the struct ByteString coming from

the publish function for MqttSink takes the following parameters:
pub fn publish(&self, topic: ByteString, payload: Bytes) -> PublishBuilder

However, it is not clear where the ByteString is coming from
Currently, I get the following error if I use this:

use ntex::util::ByteString;
use bytes::Bytes;

const so: &'static str = "aisfhsaijf";

let client = v3::client::MqttConnector::new("127.0.0.1:1884")
        .client_id("user")
        .keep_alive(1)
        .connect()
        .await
        .unwrap();

    let sink = client.sink();
    let payload = b"dasdad";
    sink.publish(ByteString::from_static(so), Bytes::from_static(payload)).send_at_least_once();

Error:

 |
49 |     sink.publish(ByteString::from_static(so), Bytes::from_static(payload)).send_at_least_once();
   |                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^ expected struct `bytestring::ByteString`, found struct `ByteString`

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.