MQTT Client/Server framework for ntex with support of v5 and v3.1.1 protocols
ntex-rs / ntex-mqtt Goto Github PK
View Code? Open in Web Editor NEWMQTT Client/Server framework for v5 and v3.1.1 protocols
License: Apache License 2.0
MQTT Client/Server framework for v5 and v3.1.1 protocols
License: Apache License 2.0
I'd like to use actix-mqtt as a MQTT client but I'm struggling a bit getting it to work for anything more than a one-shot message QoS0 PUBLISH message after a CONNECT as exercised by tests/test_server.rs. The problem boils down to the connection automatically being dropped after calling the state
function, also the ClientSession
which is passed to the ack.state
is simply dropped and the rest of the callbacks like finish
never even called.
What is the right way to keep the session open and how can the session be used to actually interface with actors?
As I can see the previous crate https://docs.rs/actix-mqtt/0.2.3/actix_mqtt/struct.MqttServer.html#method.subscribe had the ability to define subscribe and unsubscribe handlers. I hope ntex-mqtt also can. Any help is appreciated.
P.S. BTW Thank you so much @fafhrd91 for the previous answer - my mqtt-to-amqp proxy prototype works as PoC!
Kindly help me for two questions:
SubscriptionOptions
) in one call?fn_service(|control: v5::client::ControlMessage<Error>|
use a mpsc::Sender I created. using the client router client.resource("response", publish)
has the same challenge.Appreciate.
I think it would be helpful to have an example on how to use a custom Session.
Hello ๐๐ป
We've been using this crate for a little while now and thought it was time to update from 0.9
to 0.12
. But ever since we did, we noticed our clients are reconnecting every 15 seconds. To make sure the "problem" still exists in the latest version we pulled this repository and used the crate from a local path, but this still gives the same disconnects.
We did some tests and enabled TRACE
logging which shows us the following interesting bits:
2023-12-08T12:17:18Z TRACE] Starting mqtt v5 handshake
[2023-12-08T12:17:18Z TRACE] Keep alive 1: 10 // We've added this line to see if the keep-alive of the client was read correctly
[2023-12-08T12:17:18Z INFO ] new connection from client-id=be6b355e-2723-4dc5-814a-ab76e8bbd503
[2023-12-08T12:17:18Z TRACE] Sending: ConnectAck {
session_present: false,
reason_code: Success,
session_expiry_interval_secs: None,
receive_max: 65535,
max_qos: AtLeastOnce,
max_packet_size: None,
assigned_client_id: None,
topic_alias_max: 32,
retain_available: true,
wildcard_subscription_available: true,
subscription_identifiers_available: true,
shared_subscription_available: true,
server_keepalive_sec: None,
response_info: None,
server_reference: None,
auth_method: None,
auth_data: None,
reason_string: None,
user_properties: [],
}
[2023-12-08T12:17:18Z TRACE] Keep alive 2: 15 - None // This is again a line we added ourselves for additional debugging
[2023-12-08T12:17:18Z TRACE] Connection handshake succeeded
[2023-12-08T12:17:18Z TRACE] Connection handler is created, starting dispatcher
[2023-12-08T12:17:18Z DEBUG] Start keep-alive timer Seconds(15)
Now after this snippet you see a lot of publish packages coming in (we are testing this with a test client which continuously sends data) and then after 15 seconds we see this part:
[2023-12-08T12:19:29Z TRACE] Keep-alive error, stopping dispatcher
[2023-12-08T12:19:29Z TRACE] Dispatch v5 packet: DispatchItem::KeepAliveTimeout
And in between the publish messages we saw 2 of these:
[2023-12-08T12:19:16Z TRACE] Dispatch v5 packet: DispatchItem::Item((PingRequest, 0))
...
[2023-12-08T12:19:29Z TRACE] Dispatch v5 packet: DispatchItem::Item((PingRequest, 0))
The client we are using to test this with, didn't change after we updated this crate from 0.9
to 0.12
so it's either caused by some (default) config that is changed, updated or introduced in 0.12
, or its some kind of bug.
I will continue to debug this one myself as well, but I'm hoping this might sound familiar or sound like something you may have seen before? Any suggestions that might push us in the right direction in order to resolve the disconnects are very much appreciated.
Thanks!
Sander
Looks like MqttServer
incorrectly handles mqtt messages when client disconnects right after sending publications to the server.
When I use mosquitto_pub
to send 1 QoS 0 message and disconnect MqttServer
doesn't process incoming message (don't pass it to publish handler). However it handles QoS 1 message properly.
In both cases it doesn't read DISCONNECT
message send afterwards by a client.
For QoS 0 messages:
[2021-05-14T21:40:50Z INFO ntex::server::builder] Starting 1 workers
[2021-05-14T21:40:50Z INFO ntex::server::builder] Starting "mqtt" service on 0.0.0.0:1883
[2021-05-14T21:40:50Z TRACE ntex::server::accept] Starting server accept loop
[2021-05-14T21:40:50Z TRACE ntex::server::worker] Service "mqtt" is available
[2021-05-14T21:40:56Z TRACE ntex::server::accept] Accepting connection: Tcp(TcpStream { addr: 127.0.0.1:1883, peer: 127.0.0.1:54878, fd: 16 })
[2021-05-14T21:40:56Z TRACE ntex::server::worker] Got socket for service: "mqtt"
[2021-05-14T21:40:56Z TRACE ntex_mqtt::service] Start connection handshake
[2021-05-14T21:40:56Z TRACE ntex_mqtt::v3::server] Starting mqtt handshake
[2021-05-14T21:40:56Z INFO ntex_broker] Client mosq-GgswNn5xvTJLbAZeJg connected
[2021-05-14T21:40:56Z TRACE ntex_mqtt::v3::server] Sending success handshake ack: ConnectAck {
session_present: false,
return_code: ConnectionAccepted,
}
[2021-05-14T21:40:56Z TRACE ntex_mqtt::service] Connection handshake succeeded
[2021-05-14T21:40:56Z TRACE ntex_mqtt::service] Connection handler is created, starting dispatcher
[2021-05-14T21:40:56Z TRACE ntex::framed::state] io stream is disconnected
[2021-05-14T21:40:56Z TRACE ntex::framed::write] write io is closed
[2021-05-14T21:40:56Z TRACE ntex_mqtt::io] dispatcher is instructed to stop
[2021-05-14T21:40:56Z TRACE ntex_mqtt::io] service shutdown is completed, stop
[2021-05-14T21:40:56Z INFO ntex_broker] Client mosq-GgswNn5xvTJLbAZeJg closed connection
[2021-05-14T21:40:56Z INFO ntex_broker] Sink worker for mosq-GgswNn5xvTJLbAZeJg stopped
^C[2021-05-14T21:40:59Z INFO ntex::server::builder] SIGINT received, exiting
[2021-05-14T21:40:59Z TRACE ntex::server::accept] Stopping socket listener: 0.0.0.0:1883
For QoS 1 messages
2021-05-14T21:41:02Z INFO ntex::server::builder] Starting 1 workers
[2021-05-14T21:41:02Z INFO ntex::server::builder] Starting "mqtt" service on 0.0.0.0:1883
[2021-05-14T21:41:02Z TRACE ntex::server::accept] Starting server accept loop
[2021-05-14T21:41:02Z TRACE ntex::server::worker] Service "mqtt" is available
[2021-05-14T21:41:05Z TRACE ntex::server::accept] Accepting connection: Tcp(TcpStream { addr: 127.0.0.1:1883, peer: 127.0.0.1:54880, fd: 16 })
[2021-05-14T21:41:05Z TRACE ntex::server::worker] Got socket for service: "mqtt"
[2021-05-14T21:41:05Z TRACE ntex_mqtt::service] Start connection handshake
[2021-05-14T21:41:05Z TRACE ntex_mqtt::v3::server] Starting mqtt handshake
[2021-05-14T21:41:05Z INFO ntex_broker] Client mosq-mhyIWwydds6JJIbXCh connected
[2021-05-14T21:41:05Z TRACE ntex_mqtt::v3::server] Sending success handshake ack: ConnectAck {
session_present: false,
return_code: ConnectionAccepted,
}
[2021-05-14T21:41:05Z TRACE ntex_mqtt::service] Connection handshake succeeded
[2021-05-14T21:41:05Z TRACE ntex_mqtt::service] Connection handler is created, starting dispatcher
[2021-05-14T21:41:05Z TRACE ntex_mqtt::v3::dispatcher] Dispatch packet: Publish(
Publish {
packet_id: Some(
1,
),
topic: "topic",
dup: false,
retain: false,
qos: AtLeastOnce,
payload: "<REDACTED>",
},
)
[2021-05-14T21:41:05Z INFO ntex_broker] PUBLISH Publish { packet_id: Some(1), topic: "topic", dup: false, retain: false, qos: AtLeastOnce, payload: "<REDACTED>" }
[2021-05-14T21:41:05Z TRACE ntex_mqtt::v3::dispatcher] Publish result for packet Some(1) is ready
[2021-05-14T21:41:05Z TRACE ntex::framed::state] io stream is disconnected
[2021-05-14T21:41:05Z TRACE ntex::framed::write] write io is closed
[2021-05-14T21:41:05Z TRACE ntex_mqtt::io] dispatcher is instructed to stop
[2021-05-14T21:41:05Z TRACE ntex_mqtt::io] service shutdown is completed, stop
[2021-05-14T21:41:05Z INFO ntex_broker] Client mosq-mhyIWwydds6JJIbXCh closed connection
[2021-05-14T21:41:05Z INFO ntex_broker] Sink worker for mosq-mhyIWwydds6JJIbXCh stopped
^C[2021-05-14T21:41:09Z INFO ntex::server::builder] SIGINT received, exiting
[2021-05-14T21:41:09Z TRACE ntex::server::accept] Stopping socket listener: 0.0.0.0:1883
mosquitto_pub command output:
$ mosquitto_pub -h localhost -p 1883 -t topic -d -m hello -q 0 dmolokan@dmolokan-devbox
Client mosq-GgswNn5xvTJLbAZeJg sending CONNECT
Client mosq-GgswNn5xvTJLbAZeJg received CONNACK (0)
Client mosq-GgswNn5xvTJLbAZeJg sending PUBLISH (d0, q0, r0, m1, 'topic', ... (5 bytes))
Client mosq-GgswNn5xvTJLbAZeJg sending DISCONNECT
$ mosquitto_pub -h localhost -p 1883 -t topic -d -m hello -q 1 dmolokan@dmolokan-devbox
Client mosq-mhyIWwydds6JJIbXCh sending CONNECT
Client mosq-mhyIWwydds6JJIbXCh received CONNACK (0)
Client mosq-mhyIWwydds6JJIbXCh sending PUBLISH (d0, q1, r0, m1, 'topic', ... (5 bytes))
Client mosq-mhyIWwydds6JJIbXCh received PUBACK (Mid: 1, RC:0)
Client mosq-mhyIWwydds6JJIbXCh sending DISCONNECT
hello, when i connect to these addr like "broker-cn.emqx.io:8883" or "54.87.92.106:8883" by openssl. The original program will parse hostname to "broker-cn.emqx.io:8883" or "54.87.92.106:8883", but the dns name in cert is "broker-cn.emqx.io", so it will connect fail.
please see these examples that same addr, but the result is different. the reason is Parsing hostname error
Any advice will be appreciated. Do I understand correctly that all subscriptions we accept are not served automatically? So on any incoming publish with some routing key we should manually iterate all sessions and their subscriptions (to find matching) to find sessions and sinks to publish this message to? Or there are better mechanics for that?
Maybe this is already possible, and I just couldn't find a way.
We would need access to the underlying socket of the connection, at least for handling the "connect" request.
Our use case is that, when handling the "connect" packet, we need the have access to the X.509 client certificates (if any) for validation. Both openssl and rustls provide calls to extract this information from the socket. However, I couldn't find a way to get access to the socket.
I do understand that, depending on the implementation type, the socket could be of a different type. However, others seem to solve this by using a dyn &Any
argument, which you can then "downcast", to check if for the concrete type. I guess other ways should also be possible.
If it helps, I would try to create a PR to fix this issue. However, I didn't have any plan yet on how to achieve this.
I have the feeling there is a session leak in the server code in case the server gets "too many" messages.
This is the scenario:
Use the subs.rs
example
Instead of returning Ok()
in the publish method, use:
tokio::time::sleep(Duration::from_secs(1)).await;
Err(MyServerError)
Implement Drop
for MySession
, print out something
Now when you send one message, it processes it, fails, closes the connection and prints out the message from drop
.
If you send e.g. 20 messages at once, it processes all of the, closes the connection, but doesn't print out the message. This is pretty reproducible.
My assumption is, the session is still lingering somewhere.
It looks to me as if TLS currently isn't supported. I think it would be useful to have support for TLS.
Hello @fafhrd91
Has the "session expiration interval" been processed in the framework.
I did not find the relevant code in the source code.
If it is not processed, I need to support the "session expiration interval", what should I do.
At present, I don't know much about the storage and other processing of the "Session" of the framework. I hope I can tell me the specific location of the relevant code.
Is there any chance I can use v3::MqttServer
from within tokio::spawn
?
Like this:
...
#[tokio::main]
async fn main() {
...
let mqtt_server = tokio::spawn(async move {
v3::MqttServer::new(handshake_v3).publish(publish_v3).run().await
});
let _join = tokio::try_join!(mqtt_server).expect("Cannot join spawned futures!");
}
The PubAckReason enum has the value ReceiveMaximumExceeded included with value 147. While this is a valid code for the DISCONNECT packet, my reading of the spec is that it's not a valid value per spec for PUBACK.
Source code:
ntex-mqtt/src/v5/codec/packet/pubacks.rs
Line 41 in 30f505a
MQTT 5 specification reference: https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901031
I propose that we should remove this value from the enum, unless there is a compelling reason/dependency on it currently elsewhere that would force it to stay in. I observed this issue when I tried issuing this PUBACK reason and the client I was using (non-ntex-mqtt) was unable to properly parse the result code.
I'm testing using basic example and having three issues with a client disconnect timeout:
1 - The client got disconnected before send ping request to the server;
2 - If I change the server source code to configure timeout as 1.5 times the value of keepalive, client will continue to disconnecting because the ping request package doesn't refresh the keepalive timer as publish packet does;
3 - When client try to disable keepalive (keepalive = 0), the connection are closed immediataly.
The client only connect to server and subscribe to one topic. If client send publish messages periodicaly, the connection keep alive
Hello @fafhrd91, could you offer more informations/examples/documentaions about this crate? I would like to test and try to use it for my new project.
Definitely not a major issue, but I observed that the SubscribeAckReason enum value for SharedSubsriptionNotSupported has Subscription misspelled:
I'm happy to correct the issue myself, but unsure how you want to approach what would technically be a breaking change for consumers of the library? Obviously this enum value being misspelled is not a functionality issue, but I figured I would open the issue in case cleaning it up is of interest to you.
cargo-audit output:
Crate: nanorand
Version: 0.5.2
Title: Aliased mutable references from `tls_rand` & `TlsWyRand`
Date: 2021-09-23
ID: RUSTSEC-2021-0114
URL: https://rustsec.org/advisories/RUSTSEC-2021-0114
Solution: Upgrade to >=0.6.1
Dependency tree:
nanorand 0.5.2
โโโ ntex 0.4.1
โโโ ntex-mqtt 0.7.2
Hello, I am having trouble responding to the subscription.
I imitated the implementation of drogue-cloud/mqtt-integration and wanted to make a Mqtt Broker. https://github.com/drogue-iot/drogue-cloud/blob/main/mqtt-integration/src
Everything looked normal at first.
drogue-cloud/mqtt-integration only implements subscription (Qos 0).
I hope to support Qos 1.
There was a problem when I replaced send_at_most_once() with send_at_least_once()๏ผ
Related information I know so far:
The error is as follows๏ผ
[2021-09-06T11:01:27Z INFO session] Hello
[2021-09-06T11:01:27Z INFO ntex::server::builder] Starting 1 workers
[2021-09-06T11:01:27Z INFO ntex::server::builder] Starting "mqtt" service on 127.0.0.1:1883
[2021-09-06T11:01:34Z INFO session] new connection: Connect { clean_start: true, keep_alive: 60, session_expiry_interval_secs: None, auth_method: None, auth_data: None, request_problem_info: true, request_response_info: false, receive_max: None, topic_alias_max: 0, user_properties: [], max_packet_size: None, last_will: Some(LastWill { qos: AtLeastOnce, retain: false, topic: "aaaaaaaa", message: b"aaaaaaaaaaaaaaaaaaaaaa", will_delay_interval_sec: Some(5), correlation_data: None, message_expiry_interval: Some(5), content_type: Some("aaaaaaaa"), user_properties: [], is_utf8_payload: Some(true), response_topic: None }), client_id: "mqttx_6dfcaa94", username: Some("admin"), password: Some(b"1234567") }
[2021-09-06T11:01:34Z ERROR session] ProtocolError { err: Decode(MalformedPacket), pkt: Disconnect { reason_code: ImplementationSpecificError, session_expiry_interval_secs: None, server_reference: None, reason_string: None, user_properties: [] } }
[2021-09-06T11:01:34Z ERROR session] Disconnected
This is a simple example of modifying examples/session.rs, which is the same as the error I made in practice.(Other codes are the same as examples/session.rs)๏ผ
async fn control_v5<E>(
session: v5::Session<MySession>,
control: v5::ControlMessage<E>,
) -> Result<v5::ControlResult, MyServerError> {
match control {
v5::ControlMessage::Auth(a) => {
// we don't do extended authentication (yet?)
Ok(a.ack(Auth::default()))
}
v5::ControlMessage::Error(e) => Ok(e.ack(DisconnectReasonCode::UnspecifiedError)),
v5::ControlMessage::ProtocolError(pe) => {
error!("{:?}", pe);
Ok(pe.ack())
}
v5::ControlMessage::Ping(p) => Ok(p.ack()),
v5::ControlMessage::Disconnect(d) => Ok(d.ack()),
v5::ControlMessage::Subscribe(mut s) => {
s.iter_mut().for_each(|mut s| {
let sink = session.sink().clone();
let topic = s.topic().clone();
ntex::rt::spawn(async move {
match sink.publish(topic, Bytes::from("A")).send_at_least_once().await {
Ok(w) => {
error!("{:?}", w)
}
Err(e) => {
error!("{:?}", e)
}
};
});
s.subscribe(v5::QoS::AtLeastOnce);
});
Ok(s.ack())
}
v5::ControlMessage::Unsubscribe(u) => Ok(u.ack()),
v5::ControlMessage::Closed(c) => Ok(c.ack()),
}
}
How should I respond to the subscription correctly (Qos 1)๏ผ
Hello, in mqtt-ws-server.rs example, I want to remove openssl. How to modify it?
i run the ws example,and use mqttx to connect with protocol wss,but could not establish the connection.
โ ntex-mqtt git:(master) โ cargo run --example mqtt-ws-server Compiling ntex-mqtt v0.11.4 (/home/boring/workspace/ntex-mqtt) Finished dev [unoptimized + debuginfo] target(s) in 4.76s Running
target/debug/examples/mqtt-ws-server`
[2023-08-25T04:59:52Z INFO ntex::server::builder] Starting 1 workers
[2023-08-25T04:59:52Z INFO ntex::server::builder] Starting "mqtt" service on 0.0.0.0:8883
[2023-08-25T04:59:52Z TRACE ntex::server::accept] Starting server accept loop
[2023-08-25T04:59:52Z INFO ntex::server::accept] Starting socket listener on 0.0.0.0:8883
[2023-08-25T04:59:52Z TRACE ntex::server::worker] Service "mqtt" is available
[2023-08-25T04:59:52Z TRACE ntex::server::accept] Worker is available
[2023-08-25T04:59:59Z TRACE ntex::server::accept] Accepting connection: Tcp(TcpStream { addr: 192.168.2.47:8883, peer: 192.168.2.89:50088, fd: 17 }) bp: false
[2023-08-25T04:59:59Z TRACE ntex::server::accept] Sent to worker 0
[2023-08-25T04:59:59Z TRACE ntex::server::worker] Got socket for service: "mqtt"
[2023-08-25T04:59:59Z TRACE ntex_io::tasks] new 230 bytes available, wakeup dispatcher
[2023-08-25T04:59:59Z TRACE ntex_io::io] waking up io read task
[2023-08-25T04:59:59Z TRACE ntex_io::tasks] new 85 bytes available, wakeup dispatcher
Connection is established, chossing protocol
[2023-08-25T04:59:59Z TRACE ntex_io::io] waking up io read task
[2023-08-25T04:59:59Z TRACE ntex_io::tasks] new 277 bytes available, wakeup dispatcher
[2023-08-25T04:59:59Z TRACE ntex_io::io] waking up io read task
HTTP protocol is selected
[2023-08-25T04:59:59Z TRACE ntex::http::service] New http connection, peer address Some(192.168.2.89:50088)
[2023-08-25T04:59:59Z DEBUG ntex_io::ioref] start keep-alive timeout 3s
[2023-08-25T04:59:59Z TRACE ntex::http::h1::dispatcher] trying to read http message
[2023-08-25T04:59:59Z TRACE ntex::http::h1::dispatcher] http message is received:
Request HTTP/1.1 GET:/
headers:
"sec-websocket-key": "xF6wrL8RlOLLKxti+1r4Hg=="
"connection": "Upgrade"
"sec-websocket-version": "13"
"sec-websocket-extensions": "permessage-deflate; client_max_window_bits"
"upgrade": "websocket"
"sec-websocket-protocol": "mqtt"
"host": "192.168.2.47:8883"
and payload Stream(PayloadDecoder { kind: Cell { value: Eof } })
[2023-08-25T04:59:59Z DEBUG ntex_io::ioref] unregister keep-alive timeout
[2023-08-25T04:59:59Z TRACE ntex::http::h1::dispatcher] prep io for upgrade handler
[2023-08-25T04:59:59Z TRACE ntex::http::h1::dispatcher] switching to upgrade service for
Request HTTP/1.1 GET:/
headers:
"sec-websocket-key": "xF6wrL8RlOLLKxti+1r4Hg=="
"connection": "Upgrade"
"sec-websocket-version": "13"
"sec-websocket-extensions": "permessage-deflate; client_max_window_bits"
"upgrade": "websocket"
"sec-websocket-protocol": "mqtt"
"host": "192.168.2.47:8883"
[2023-08-25T04:59:59Z DEBUG ntex_io::ioref] unregister keep-alive timeout
[2023-08-25T04:59:59Z DEBUG ntex_io::ioref] unregister keep-alive timeout
[2023-08-25T04:59:59Z TRACE ntex_io::io] not enough data to decode next frame
[2023-08-25T04:59:59Z TRACE ntex_tokio::io] tcp stream is disconnected
[2023-08-25T04:59:59Z TRACE ntex_tokio::io] write task is instructed to terminate
[2023-08-25T04:59:59Z DEBUG ntex_io::ioref] unregister keep-alive timeout
[2023-08-25T04:59:59Z TRACE ntex_io::io] io is dropped, force stopping io streams IO_STOPPED | IO_STOPPING | IO_STOPPING_FILTERS | WR_PAUSED | KEEPALIVE
[2023-08-25T04:59:59Z TRACE ntex_io::ioref] force close io stream object
`
The subs.rs example here https://github.com/petergmiller/ntex-mqtt/blob/master/examples/subs.rs uses .subscribe to indicate successful subscription, as opposed to using confirm. While either appear to be acceptable (as subscribe just calls confirm), it seems that confirm would be preferable as subscribe is hidden in docs (
Line 287 in 5814176
This is useful for throttling and bandwidth restrictions.
When using v5::Router
as the publish
service, how can I access the path wildcard matches in the handler?
I'm working on a use case to translate MQTT message to a WebSocket based message without MQTT Broker. So, my tool has to play two roles: MQTT broker and Websocket client.
My WebSocket client is based on Actix Actor and therefore it needs actix-rt. Glad to see that Actix-MQTT becomes ntex-mqtt now, my questions:
#1) Is it possible to let ntex-rt and actix-rt work together?
#2) if not, is there any way to let both of them running in tokio runtime?
Always thanks to the great work done by. @fafhrd91
I'm trying to use this client and connect to Azure IoTHub via MQTT on port 8883 but I can't find an API to load CA root cert. (BaltimoreCyberTrustRoot.crt.pem)
I can use Python Paho lib to valid and have connected to Azure IoTHub via MQTT directly.
In subscribe.rs, when a subscription identifier is specified, the subscribe packet is malformed because it is missing a byte. The property length should also account for the property type byte.
https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901092 Section 3.8.2.1.2 "11 (0x0B) Byte, Identifier of the Subscription Identifier. Followed by a Variable Byte Integer representing the identifier of the subscription."
Malformed subscribe packet sent with ntex-mqtt:
Two malformed packets sent with ntex-mqtt where subscription id = 1 and subscription id = 2. This shows that the 0x0b byte indicates subscription id property type and the following byte is the actual subscription id.
Valid subscribe packet sent using MQTTnet. The property length is 02 instead of 01.
Can you make an introduction and suggestion
Hi, would you be interested in adding bridge support?
Having some troubles with upgrade to 0.8.x:
error[E0432]: unresolved import `rust_tls`
--> /home/andrey/.cargo/registry/src/github.com-1ecc6299db9ec823/ntex-0.5.6/src/http/client/connector.rs:81:17
|
81 | use rust_tls::{OwnedTrustAnchor, RootCertStore};
| ^^^^^^^^ use of undeclared crate or module `rust_tls`
error[E0432]: unresolved import `crate::rt::tcp_connect_in`
--> /home/andrey/.cargo/registry/src/github.com-1ecc6299db9ec823/ntex-0.5.6/src/connect/service.rs:5:5
|
5 | use crate::rt::tcp_connect_in;
| ^^^^^^^^^^^^^^^^^^^^^^^^^ no `tcp_connect_in` in `rt`
error[E0432]: unresolved import `crate::rt::Signal`
--> /home/andrey/.cargo/registry/src/github.com-1ecc6299db9ec823/ntex-0.5.6/src/server/builder.rs:10:24
|
10 | use crate::rt::{spawn, Signal, System};
| ^^^^^^ no `Signal` in `rt`
error[E0432]: unresolved import `crate::rt::tcp_connect`
--> /home/andrey/.cargo/registry/src/github.com-1ecc6299db9ec823/ntex-0.5.6/src/server/test.rs:6:17
|
6 | use crate::rt::{tcp_connect, System};
| ^^^^^^^^^^^ no `tcp_connect` in `rt`
error[E0433]: failed to resolve: use of undeclared crate or module `webpki_roots`
--> /home/andrey/.cargo/registry/src/github.com-1ecc6299db9ec823/ntex-0.5.6/src/http/client/connector.rs:86:17
|
86 | webpki_roots::TLS_SERVER_ROOTS.0.iter().map(|ta| {
| ^^^^^^^^^^^^ use of undeclared crate or module `webpki_roots`
Any additional info I should provide?
Upgraded to 1.69 rust and getting the below error when trying to compile router_v5 from examples.
error[E0432]: unresolved import crate::rt::Signal
--> .cargo/registry/src/github.com-1ecc6299db9ec823/ntex-0.6.7/src/server/builder.rs:8:24
|
8 | use crate::rt::{spawn, Signal, System};
| ^^^^^^ no Signal
in rt
|
= help: consider importing one of these items instead:
crate::server::ServerCommand::Signal
polling::os::kqueue::Signal
Hello,
on evaluating your library I noticed some weired behavior.
I try to explain what noticed and what expected
in src\v5\codec\packet\pubacks.rs PublishAck::decode
The Decoder expects that reason code and properties are present or absent.
(The same I would interpret the MQTT specification)
I tested with different MQTT Brokers and notices then every server or Client I found
allows ReasonCode != 0 to add, but dont send 0 for propertyLength.
This Results in a remaining of 3. ( id(2) + reason(1) )
The Code expect 2 ( id(2) or 4 ( id(2)+reason(1)+length(1) ).
My Question: is this a Bug or a Feature ?
My Rust Knowledge is to low, so this is only a test to fix :
impl PublishAck {
pub(crate) fn decode(src: &mut Bytes) -> Result<Self, DecodeError> {
let packet_id = NonZeroU16::decode(src)?;
let (reason_code, properties, reason_string) = if src.has_remaining() {
let reason_code = src.get_u8().try_into()?;
let (properties, reason_string) = if src.has_remaining() { // accept missing properties
ack_props::decode(src)?
} else {
(UserProperties::default(), None)
};
ensure!(!src.has_remaining(), DecodeError::InvalidLength); // no bytes should be left
(reason_code, properties, reason_string)
} else {
(PublishAckReason::Success, UserProperties::default(), None)
};
Ok(Self { packet_id, reason_code, properties, reason_string })
}
Perhaps this helps someone.... I don't know if this is a Bug or not.
It looks like there is currently no way to access the subscription ID of a subscribe packet:
As the wrapping structure is not exposing the packet:
Lines 109 to 112 in f463864
I guess there should be a packet()
function for the v5 subscribe structure.
If you like, I can prepare a PR for this.
I just noticed that there's a v5 branch. Is this going to be integrated into master as the official MQTT 5 support or is this just a random side gig.
I'd be very interested in contributing and would like to know the way forward here.
By default, the MQTT v5 server responds with a maximum packet size of zero:
However, according to the spec, this is not allowed:
It is a Protocol Error to include the Maximum Packet Size more than once, or for the value to be set to zero.
https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901050
I guess this is due to the fact, that the MqttServer
is initialized with 0
:
Line 61 in 828e696
To my understanding, in the case of "unlimited", it should simply be omitted:
If the Maximum Packet Size is not present, no limit on the packet size is imposed beyond the limitations in the protocol as a result of the remaining length encoding and the protocol header sizes.
cargo run --example basic
mosquitto_sub -t "hello/world" -d
[2021-05-21T04:04:18Z WARN ntex_mqtt::v3::default] MQTT Subscribe is not supported
I failed to build in centos7.3 environment with ntex-mqtt v0.8.3 version.
Here is the error:
error[E0599]: no method named `is_init_finished` found for reference `&SslRef` in the current scope
--> /root/.cargo/registry/src/github.com-1ecc6299db9ec823/ntex-tls-0.1.3/src/openssl/mod.rs:204:54
|
204 | if self.inner.borrow().ssl().is_init_finished() {
| ^^^^^^^^^^^^^^^^ method not found in `&SslRef`
For more information about this error, try `rustc --explain E0599`.
error: could not compile `ntex-tls` due to previous error
openssl version is
OpenSSL 1.1.1i 8 Dec 2020
The client sink shuts shown after sending the first keepalive ping. Notice the logs where mqtt client connection is closed.
#[ntex::main]
async fn main() -> std::io::Result<()> {
std::env::set_var("RUST_LOG", "ntex=trace,ntex_mqtt=trace,client=trace");
env_logger::init();
// connect to server
let client = client::MqttConnector::new("127.0.0.1:1883")
.client_id("user")
// [BUG] adding a keep a keep alive timeout causes the client to close
// after first ping
.keep_alive(1)
.connect()
.await
.unwrap();
let sink = client.sink();
let router = client.resource("response", publish_v5);
ntex::rt::spawn(router.start_default());
let expire = RtInstant::from_std(
Instant::now() + Duration::from_secs(1000));
delay_until(expire).await;
sink.close();
Ok(())
}
Logs:
2021-01-13T00:54:37Z TRACE ntex_codec::framed] attempting to decode a frame
[2021-01-13T00:54:37Z DEBUG ntex_mqtt::v5::client::connection] start mqtt client keep-alive task 1
[2021-01-13T00:54:38Z TRACE ntex_mqtt::v5::client::connection] send ping
[2021-01-13T00:54:38Z TRACE ntex_mqtt::framed] FLUSH and Stop state
[2021-01-13T00:54:38Z TRACE ntex_codec::framed] flushing framed transport
[2021-01-13T00:54:38Z TRACE ntex_mqtt::framed] ready
[2021-01-13T00:54:38Z TRACE ntex_mqtt::framed] Framed flushed, shutdown
[2021-01-13T00:54:38Z TRACE ntex_codec::framed] framed transport flushed and closed
[2021-01-13T00:54:39Z TRACE ntex_mqtt::v5::client::connection] send ping
**[2021-01-13T00:54:39Z DEBUG ntex_mqtt::v5::client::connection] mqtt client connection is closed, stopping keep-alive task**
Hi. I'm looking for an mqtt broker. The example looks very promising.
I would be interested in MQTT over websockets. To my understanding, and I may be wrong here, one would "just" need to encapsulate MQTT packets in WebSocket frames.
From what I saw, ntex has support for WebSockets and brings plain MQTT already. So maybe, there is a simple way to implement this.
I'm working on a project that is trying to use ntex-mqtt
and so far it seems to work out pretty well. So first of all thanks for this amazing framework!!
Now my issue is with defining the function signature of a function that returns the result of v3::MqttServer::new
. The function looks like this:
fn new_server_v3(
<snip>
) -> v3::MqttServer<ClientSession, ?, ?, ?> {
v3::MqttServer::new(|handshake: v3::Handshake| async {
// do some prep stuff
let session = ClientSession {
<snip>
};
Ok::<_, ServerError>(handshake.ack(session, false))
})
.control(control_service_factory_v3())
.publish(fn_factory_with_config(
|session: v3::Session<ClientSession>| {
Ready::Ok::<_, ServerError>(fn_service(move |req| publish_v3(session.clone(), req)))
},
))
}
For readability and because this function takes some variables that we need later again to create the v5 server, we really prefer to create this function instead of defining it inline. Can you maybe tell me how to define the type of how I should be able to find that out by myself?
Thanks!!
As a server, how to ensure that the client must verify the account and password when connecting
Trying to use publish_qos1
instead of publish_qos0
doesn't seem to work as intended, if I .await
the result the packet is never sent over the wire and the execution just hangs. If I don't .await
(and ignore the violent warning from rustc that async functions must be polled to achieve something) and follow uo with a publish_qos0
I can see that both packets are batched up and sent back-to-back in the same TCP packet to the broker.
I was trying to implement SUBSCRIBE support but that suffers from the same problem.
I'm a NOOB to Rust, but when I try to compile the code under examples router_v5, getting the below error. Would appreciate any help/hints in resolving this
error[E0658]: generic associated types are unstable
--> .cargo/registry/src/github.com-1ecc6299db9ec823/ntex-service-1.0.2/src/and_then.rs:41:5
|
41 | type Future<'f> = AndThenServiceResponse<'f, A, B, Req> where Self: 'f, Req: 'f;
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
= note: see issue #44265 rust-lang/rust#44265 for more information
the publish function for MqttSink takes the following parameters:
pub fn publish(&self, topic: ByteString, payload: Bytes) -> PublishBuilder
However, it is not clear where the ByteString is coming from
Currently, I get the following error if I use this:
use ntex::util::ByteString;
use bytes::Bytes;
const so: &'static str = "aisfhsaijf";
let client = v3::client::MqttConnector::new("127.0.0.1:1884")
.client_id("user")
.keep_alive(1)
.connect()
.await
.unwrap();
let sink = client.sink();
let payload = b"dasdad";
sink.publish(ByteString::from_static(so), Bytes::from_static(payload)).send_at_least_once();
Error:
|
49 | sink.publish(ByteString::from_static(so), Bytes::from_static(payload)).send_at_least_once();
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^ expected struct `bytestring::ByteString`, found struct `ByteString`
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.