Giter Club home page Giter Club logo

rust-mqtt's Introduction

Rust-mqtt

About

Rust-mqtt is native MQTT client for both std and no_std environments. Client library provides async API which can be used with various executors. Currently, supporting only MQTTv5 but everything is prepared to extend support also for MQTTv3 which is planned during year 2022.

Async executors

For desktop usage I recommend using Tokio async executor and for embedded there is prepared wrapper for Drogue device framework in the Drogue-IoT project examples mqtt module.

Restrains

Client supports following:

  • QoS 0 & QoS 1 (All QoS 2 packets are mapped for future client extension)
  • Only clean session
  • Retain not supported
  • Auth packet not supported
  • Packet size is not limited, it is totally up to user (packet size and buffer sizes have to align)

Building

cargo build

Running tests

Integration tests are written using tokio network tcp stack and can be find under tokio_net.

cargo test unit
cargo test integration
cargo test load

Minimum supported Rust version (MSRV)

Rust-mqtt is guaranteed to compile on stable Rust 1.75 and up. It might compile with older versions but that may change in any new patch release.

Acknowledgment

This project could not be in state in which currently is without Ulf Lilleengen and rest of the community from Drogue IoT.

Contact

For any information contact me on email [email protected]

rust-mqtt's People

Contributors

badrbouslikhin avatar eldruin avatar lulf avatar matoushybl avatar obabec avatar sambenko avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar

rust-mqtt's Issues

Support for embedded-tls?

Hi, I've been trying to get rust-mqtt to work with embedded-tls. I'm not sure if it is supported at all (only sign I found is the 'tls' feature)

Originally for an ESP32C3, but switched to std/tokio until I've got this working.

I've managed to get an encrypted connection up and running, and pass that into the rust-mqtt client, but it seems to hang in the connect_to_broker method, until it finally times out.

Maybe something doesn't get flushed properly? I can dig more, but I thought I'd reach out first.

Gist of my example: https://gist.github.com/flyaruu/71f9448474be28f0c9050fdcc86cbba8

(If I remove the TLS stuff, and pass the tcp connection straight into the mqtt_client (with a FromTokio) it works fine)

Any ideas?

How to add per-message properties

I would like to add the Property::ContentType to the mqtt messages. connect_to_broker() does not include the property as property_allowed() returns false for that property and send_message() does not include any properties from the config.
Ideally there should be some way of specifying per-message properties - is this currently possible? I may be missing something.

`Client::receive_message` is not cancel safe

Hello, Client::receive_message is taking a &mut self, so it is not possible to send message if I want to listen for a messages, without cancelling the future via, for example, select. That way if I need to send a message, I quit listening network and sending a message. But looking at the source code I found out that it is not cancel safe - data may be lost.

#[cfg(not(feature = "tls"))]
async fn receive_packet<'c, T: Read + Write>(
    buffer: &mut [u8],
    buffer_len: usize,
    recv_buffer: &mut [u8],
    conn: &'c mut NetworkConnection<T>,
) -> Result<usize, ReasonCode> {
    use crate::utils::buffer_writer::RemLenError;

    let target_len: usize;
    let mut rem_len: Result<VariableByteInteger, RemLenError>;
    let mut writer = BuffWriter::new(buffer, buffer_len);
    let mut i = 0;

    // Get len of packet
    trace!("Reading lenght of packet");
    loop {
        trace!("    Reading in loop!");
        let len: usize = conn
            .receive(&mut recv_buffer[writer.position..(writer.position + 1)])
            .await?;
        trace!("    Received data!");
        if len == 0 {
            trace!("Zero byte len packet received, dropping connection.");
            return Err(ReasonCode::NetworkError);
        }
        i += len;
        if let Err(_e) = writer.insert_ref(len, &recv_buffer[writer.position..i]) {
            error!("Error occurred during write to buffer!");
            return Err(ReasonCode::BuffError);
        }
        if i > 1 {
            rem_len = writer.get_rem_len();
            if rem_len.is_ok() {
                break;
            }
            if i >= 5 {
                error!("Could not read len of packet!");
                return Err(ReasonCode::NetworkError);
            }
        }
    }
    trace!("Lenght done!");
    let rem_len_len = i;
    i = 0;
    if let Ok(l) = VariableByteIntegerDecoder::decode(rem_len.unwrap()) {
        trace!("Reading packet with target len {}", l);
        target_len = l as usize;
    } else {
        error!("Could not decode len of packet!");
        return Err(ReasonCode::BuffError);
    }

    loop {
        if writer.position == target_len + rem_len_len {
            trace!("Received packet with len: {}", (target_len + rem_len_len));
            return Ok(target_len + rem_len_len);
        }
        let len: usize = conn
            .receive(&mut recv_buffer[writer.position..writer.position + (target_len - i)])
            .await?;
        i += len;
        if let Err(_e) =
            writer.insert_ref(len, &recv_buffer[writer.position..(writer.position + i)])
        {
            error!("Error occurred during write to buffer!");
            return Err(ReasonCode::BuffError);
        }
    }
}

For example, if somewhere during the loop, in between awaits future will be canceled, all data previously read to buffer will remain inside, but writer has it's state saved on stack, so writer.position will be lost, as current stack frame will be destroyed during cancellation. But conn will continue to give bytes from the packet, so any receive_packet will likely end with and Error.

Connection dropping when no new messages in subscribed topic for some time

I connecting such way to my MQTT server:

let mut client =
        MqttClient::<_, 5, _>::new(socket, &mut write_buffer, 80, &mut recv_buffer, 80, config);
    debug!("BROKER CONNECTING");
    client.connect_to_broker().await.unwrap();
    debug!("BROKER AFTER CONNECTING");
    let mut topic_names = Vec::<_, 2>::new();
    topic_names.push("switch_0").unwrap();
    topic_names.push("switch_1").unwrap();

    client.subscribe_to_topics(&topic_names).await.unwrap();
    Timer::after_millis(500).await;

    loop {
      
        Timer::after_millis(500).await;
        let (topic, message) = match client.receive_message().await {
            Ok(msg) => msg,
            Err(err) => {
                error!("ERROR OCCURED: {}", err);
                continue;
            }
        };
        info!("topic: {}, message: {}", topic, message);
    }

Connection is working and receiving messages but when there is no new message in MQTT subscirbed topic for around 30 sec im getting NetworkError. I tried to use such solution:

select_biased! {
           receive_res = client.receive_message().fuse() => {
               let (topic, message) = receive_res.unwrap();
               info!("topic: {}, message: {}", topic, message);
           }
           result = client.send_ping().fuse() => {
               info!("Ping: {}", result);
           }
       };

to sustain connection but i can not borrow client twice (i think this could be need to handle it this way: #23). Is it normal behavior of such dropping connection when there is no new messages? Shouldn't that send_ping() be implemented inside of that redeive_message functions?

Or im just using this library wrong way?

Support for concurrent send&recv

As of now client does not support concurrent send and pull action.
It would be more than pleasant to enable this feature so embedded devices with limited
socket capacity can do both actions.

Implement the futures Stream trait for subscriptions

Instead of having a receive_message() method on the client, have the subscribe_* methods return a type holding a mut reference to the client, and implementing the Stream trait from the futures crate. Usage

let sub = client.subscribe_to_topic(...).await;

loop {
   if let Ok(message) = sub.next().await {
       // Process message
   }
}

Then when sub goes out of scope, the client can be reused.

Connecting to a Socket from embassy-net 0.2.1

Hello,
I am getting an error passing the socket to the Mqtt client.
My code is as follows:

    use embassy_net::{
        tcp::TcpSocket,
        {dns::DnsQueryType, Stack},
    };
    use rust_mqtt::{
        client::{client::MqttClient, client_config::ClientConfig},
        utils::rng_generator::CountingRng,
    };
    [...]
    let mut socket = TcpSocket::new(stack, &mut rx_buffer, &mut tx_buffer);
    socket.set_timeout(Some(embassy_time::Duration::from_secs(10)));

     let mut client =
            MqttClient::<_, 5, _>::new(socket, &mut write_buffer, 80, &mut recv_buffer, 80, config);

The client initialization is throwing an error: the trait embedded_io::asynch::Write is not implemented for TcpSocket<'_>
How should I work around this error ?

Thanks!

Support for receiving message only if data is ready

Currently it seems to be hard to use one socket to both send and receive messages. I'm not sure whether it's actually possible to read messages that might have a gap longer than the connection timeout, since receive_message will block until it receives a message, which means you can't send a ping to keep the connection alive, leading to the issue in #36. #38 suggests that this can't be done with a select as recommended in #36, since it seems that data could be lost if the receive is cancelled after reading partial data.

I think I've got at least a partial solution to this - at least with some io implementations we will have ReadReady available as well as Read + Write, and this allows checking whether there is at least one byte of data available to be read immediately. This means we can implement a new receive_message_if_ready method - this will return immediately if there is no data available to read immediately, otherwise it will pass through to the existing receive_message method.

Technically this doesn't seem like a full fix - you could still have a case where a partial message comes through and then the rest is delayed, however I don't think this is really a problem. If you have a partial message then no more data received for more than a few seconds, it probably indicates that the connection or server is down and there's not much point in returning early from receive to e.g. send a ping, and I'd guess eventually the socket will either recover or close. It does fix the presumably much more common issue where everything is fine on the server, client and connection - there's just a long gap between incoming messages, but each message comes through relatively quickly after the first byte.

I'll submit a PR for the implementation, it doesn't really change anything much except adding the "if_ready" version of receive and poll methods, in the case where you have ReadReady available.

Side note - there is currently an issue with embassy-net where read_ready() is not implemented correctly for TcpSocket, this is fixed in main but not in the current 0.4.0 release. I've tested this approach on an esp32c3 with the fixed embassy-net and it seems to work fine for a loop with receive_message_if_ready() and send_ping(), staying connected overnight even if no messages are being received.

Connecting to socket from embassy-net 0.4.0

I'm getting an linker error when I want to want to connect to a broker on a embassy-net socket.
Here is some info:


embassy-net = { version = "0.4.0", features = ["medium-ip", "tcp", "medium-ethernet", "dhcpv4", "log", "udp", "dns"] }

rust-mqtt = { version = "0.3.0", features = ["no_std"], default-features = false }

esp-wifi        = { version = "0.3.0", features = ["esp32c3", "wifi-logs", "wifi", "utils", "wifi-default", "embassy-net", "async"] }


//dns query of mqtt broker ip
let mqtt_ip: Vec<_, 1>;
loop{
    println!("DNS query to MQTT server...");
    if let Ok(ip_addr) = stack.dns_query("broker.mqttdashboard.com", embassy_net::dns::DnsQueryType::A).await {
        println!("DNS query to MQTT server success!");
        mqtt_ip = ip_addr;
        break;
    }
    Timer::after(Duration::from_millis(1000)).await;
}

println!("MQTT IP is: {:?}", mqtt_ip);

//connect to mqtt broker
println!("Connecting to socket...");
let endpoint = IpEndpoint::new(mqtt_ip[0], 1883);
let mut socket = embassy_net::tcp::TcpSocket::new(stack, &mut rx_buffer, &mut tx_buffer);
socket.connect(endpoint).await.map_err(|_| ReasonCode::NetworkError).unwrap();
println!("Connected to MQTT broker...");

let mut config: ClientConfig<'_, 20, CountingRng> = ClientConfig::new(
    rust_mqtt::client::client_config::MqttVersion::MQTTv5,
    CountingRng(20000),
);

config.add_max_subscribe_qos(rust_mqtt::packet::v5::publish_packet::QualityOfService::QoS1);
config.add_client_id("client");
config.max_packet_size = 20;

let mut recv_buffer = [0; 80];
let mut write_buffer = [0; 80];

let mut client = MqttClient::new(
    socket,
    &mut write_buffer,
    80,
    &mut recv_buffer,
    80,
    config,
);

println!("Connecting to broker...");
client.connect_to_broker().await.unwrap();

Compilation error comes when the last line is included in my code.

error: linking with rust-lld failed: exit status: 1

and a lot of

= note: rust-lld: error: undefined symbol: _defmt_acquire
rust-lld: error: undefined symbol: _defmt_release
rust-lld: error: undefined symbol: _defmt_write`

Would be very happy for any suggestion how to fix this, I wouldn't like to go far too back in dependency hell and use old versions of embassy-net and then hal and esp-wifi :D

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.