Giter Club home page Giter Club logo

Comments (12)

X-Ryl669 avatar X-Ryl669 commented on September 26, 2024

Sure:

// Usual programs
#include <stdio.h>
#include <stdlib.h>

// We need MQTT client 
#include "Network/Clients/MQTT.hpp"


struct MessageReceiver : public Network::Client::MessageReceived
{
    void messageReceived(const Network::Client::MQTTv5::DynamicStringView & topic, const Network::Client::MQTTv5::DynamicBinDataView & payload, 
                         const uint16 packetIdentifier, const Network::Client::MQTTv5::PropertiesView & properties)
    {
        fprintf(stdout, "Msg received: (%04X)\n", packetIdentifier);
        fprintf(stdout, "  Topic: %.*s\n", topic.length, topic.data);
        fprintf(stdout, "  Payload: %.*s\n", payload.length, payload.data);
    }

};

Network::Client::MQTTv5::QoSDelivery QoS = Network::Client::MQTTv5::QoSDelivery::AtMostOne;

volatile bool cont = true;
void ctrlc(int sig)
{
    if (sig == SIGINT) cont = false;
}


int main(int argc, const char ** argv)
{
    
    const char * server = "yourserver.com";
    const uint16 port = 1883;
    const char * username = "bob";
    const char * password = "sponge";
    const char * subscribe = "/topic/to/subscribe/to";
    const char * publishTopic = "/topic/to/publish/to";
    const char * publishMessage = "I'm alive... again";
    unsigned keepAlive = 300; 
    
    MessageReceiver receiver;
    
    Network::Client::MQTTv5 client(clientID, &receiver);
    Network::Client::MQTTv5::DynamicBinDataView pw(strlen(password), (const uint8*)password);

    if (Network::Client::MQTTv5::ErrorType ret = client.connectTo(server, port, false, 
                                                                  (uint16)min(65535U, keepAlive), true, username, &pw))
    {
        return fprintf(stderr, "Failed connection to %s with error: %d\n", (const char*)serverURL.asText(), (int)ret); 
    }
    printf("Connected to %s\n", (const char*)serverURL.asText());

    // Check if we have some subscription
        if (Network::Client::MQTTv5::ErrorType ret = client.subscribe(subscribe, Protocol::MQTT::V5::GetRetainedMessageAtSubscriptionTime, true, QoS, false))
        {
            return fprintf(stderr, "Failed subscribing to %s with error: %d\n", (const char*)subscribe, (int)ret);
        }
        printf("Subscribed to %s\nWaiting for messages...\n", (const char*)subscribe);

        // Then enter the event loop here
        signal(SIGINT, ctrlc);
        time_t last = time(NULL);
        while (cont)
        {
            if (Network::Client::MQTTv5::ErrorType ret = client.eventLoop())
                return fprintf(stderr, "Event loop failed with error: %d\n", (int)ret);

            // Is it time to publish ?
            if (time(NULL) - last >= 1) {
                if (Network::Client::MQTTv5::ErrorType ret = client.publish(publishTopic, publishMessage, strlen(publishMessage), false, QoS)) {
                  return fprintf(stderr, "Failed publishing %s to %s with error: %d\n", publishMessage, publishTopic, (int)ret);
              }
              printf("Published %s to %s\n", publishMessage, publishTopic);
              last = time(NULL);
           }
       }


        return 0;
}

This is more or less the same as in the test client file MQTTc.cpp. I've removed all utility code so it's as simple as it can be. I haven't tested the code, but you get the idea...

from emqtt5.

nitol-saha avatar nitol-saha commented on September 26, 2024

Thanks! I will run the code today and will let you know if I face any issues.

from emqtt5.

nitol-saha avatar nitol-saha commented on September 26, 2024

Hello, I was able to run the code. I modified the code a little bit. I have one question. In the modified code below whenever I am getting a data in subscriber topic, I am publishing the timestamp. I am checking the echoCheck variable and if I receive any message, I am publishing timestamp in while (cont) loop. Is this the correct way of doing it? or can I publish in the messageReceived function? If possible can you guide me how to implement it? My concern is as I am running a infinte while loop and checking the echoCheck variable continuously it will increase the CPU usage.

#include <stdio.h>
#include <stdlib.h>
#include <ctime>
#include <signal.h>

// We need MQTT client
#include "Network/Clients/MQTT.hpp"
int echoCheck=0;
char* data; 
struct MessageReceiver : public Network::Client::MessageReceived
{
    void messageReceived(const Network::Client::MQTTv5::DynamicStringView& topic,
                         const Network::Client::MQTTv5::DynamicBinDataView& payload,
                         const uint16_t packetIdentifier,
                         const Network::Client::MQTTv5::PropertiesView& properties)
    {
        fprintf(stdout, "Msg received: (%04X)\n", packetIdentifier);
        fprintf(stdout, "  Topic: %.*s\n", topic.length, topic.data);
        fprintf(stdout, "  Payload: %.*s\n", payload.length, payload.data);
        echoCheck=1;
    }
};

Network::Client::MQTTv5::QoSDelivery QoS = Network::Client::MQTTv5::QoSDelivery::AtMostOne;

volatile bool cont = true;

void ctrlc(int sig)
{
    if (sig == SIGINT)
        cont = false;
}

int main(int argc, const char** argv)
{
    const char* server = "localhost";
    const uint16_t port = 1883;
    const char* username = "bob1";
    const char* password = "sponge1";
    const char* subscribe = "topic1";
    const char* publishTopic = "topic2";
    const char* clientID = "Client2";
    unsigned keepAlive = 300;

    MessageReceiver receiver;

    Network::Client::MQTTv5 client(clientID, &receiver);
    Network::Client::MQTTv5::DynamicBinDataView pw(strlen(password), (const uint8_t*)password);

    if (Network::Client::MQTTv5::ErrorType ret = client.connectTo(server, port, false,
                                                                  (uint16_t)std::min(65535U, keepAlive), true,
                                                                  username, &pw))
    {
        return fprintf(stderr, "Failed connection to %s with error: %d\n", server, (int)ret);
    }
    printf("Connected to %s\n", server);

    // Check if we have some subscription
    if (Network::Client::MQTTv5::ErrorType ret =
            client.subscribe(subscribe, Protocol::MQTT::V5::GetRetainedMessageAtSubscriptionTime, true, QoS, false))
    {
        return fprintf(stderr, "Failed subscribing to %s with error: %d\n", subscribe, (int)ret);
    }
    printf("Subscribed to %s\nWaiting for messages...\n", subscribe);

    // Then enter the event loop here
    signal(SIGINT, ctrlc);
    time_t lastPublishTime = time(NULL);

    while (cont)
    {
        if (Network::Client::MQTTv5::ErrorType ret = client.eventLoop())
            return fprintf(stderr, "Event loop failed with error: %d\n", (int)ret);

        // Check if it's time to publish (every second)
        time_t currentTime = time(NULL);
        if (echoCheck == 1)
        {
            // Convert current time to string format
            char timestamp[20];
            strftime(timestamp, sizeof(timestamp), "%Y-%m-%d %H:%M:%S", localtime(&currentTime));

            // Publish the timestamp
            if (Network::Client::MQTTv5::ErrorType ret =
                    client.publish(publishTopic, (const uint8_t*)timestamp, strlen(timestamp), false, QoS))
            {
                return fprintf(stderr, "Failed publishing %s to %s with error: %d\n", timestamp, publishTopic,
                               (int)ret);
            }

            printf("Published %s to %s\n", timestamp, publishTopic);
            lastPublishTime = currentTime;
            echoCheck=0;
        }
    }

    return 0;
}

from emqtt5.

X-Ryl669 avatar X-Ryl669 commented on September 26, 2024

There's no notion of threads in eMQTT5. This means that MessageReceiver::messageReceived is called inside the eventLoop method.
So yes, you can publish in the messageReceived method.
The while loop isn't an infinite loop either because inside the eventLoop there's a call to select (or epoll or kevent depending on the platform) for the MQTT socket. This gives the control to the kernel, and the kernel will sleep the CPU for the default timeout period (you can change with client.setDefaultTimeout) until there's an event on the socket, not wasting CPU resources.

On a microcontroller like the ESP32, this also works this way, the application CPU is waiting on a FreeRTOS' semaphore that will only be signaled when a WIFI packet is received, not burning CPU cycles. Yet, on such micro-controller, because the WIFI hardware device must listen to messages, it can't be put to sleep, so you'll get the usual WIFI consumption of 160mA.

from emqtt5.

nitol-saha avatar nitol-saha commented on September 26, 2024

Thank you for the information. I have another question. On paho there is on_publish callback to check the message has been published or not.

On_publish Callback
When the message has been published to the Broker an acknowledgement is sent that results in the on_publish callback being called.

Is there any similar thing in the eMQTT5 library? I have only seen messageReceived method.

from emqtt5.

X-Ryl669 avatar X-Ryl669 commented on September 26, 2024

Depends on what you expect. If you want to make sure the message is received by the broker, you can set the QoS to AtLeastOne that's QoS=1 or ExactlyOne that's QoS=2. Any missing ACK will return an error from publish method.
If you are paranoid that the broker is acting as expected, you can also subscribe to the same topic you're publishing to and set the withAutoFeedback parameter to true when subscribing so you'll receive your own message.

If you want to make sure the message is just sent, then you'll check the return code of the publish method. If it's 0 (or Success) than you are sure the message was sent. If it's anything other than that, there was a failure sending or creating the message.

When you call publish, the whole publish cycle is performed so it either result in a successful publishing or a failed publishing, with no intermediate or transcient state

from emqtt5.

nitol-saha avatar nitol-saha commented on September 26, 2024

Thanks for the information! I am getting CPU usage very low for my use case.

from emqtt5.

nitol-saha avatar nitol-saha commented on September 26, 2024

I have some general questions about the library to find compatibility for my project. If you can answer the following questions that would be really helpful:

  • Is the library compatible or can be made compatible with ACE (Adaptive Communication Environment) (http://www.dre.vanderbilt.edu/~schmidt/ACE-overview.html)?

  • Is there any dynamic memory allocation (new, delete)?

  • Does the library have threading model or single thread execution?

  • How are the exception handled? (Exception handling should not throw and end execution)

from emqtt5.

X-Ryl669 avatar X-Ryl669 commented on September 26, 2024
* Is the library compatible or can be made compatible with ACE (Adaptive Communication Environment) (http://www.dre.vanderbilt.edu/~schmidt/ACE-overview.html)?

There is no such thing as ACE in the library. It can be made compatible with it, but you'll need to write a wrapper. Since the library use the minimum possible dependencies, there's almost nothing to wrap (no thread, no filesystem, no OS, etc...), only the socket code, so it shouldn't be hard I think. Please have a look at esp-eMQTT5 port for an example wrapping for FreeRTOS/Lwip, it's ~100 lines of code.

* Is there any dynamic memory allocation (new, delete)?

No, see here. There's only one heap allocation for the socket and the single network buffer when creating the MQTTClient instance, but you can substitute it for a stack based buffer instead if you need so (change the line 695 of MQTTClient.cpp to look like this:

        /** The receiving data buffer */
        uint8               recvBuffer[YOUR MAX EXPECTED PACKET SIZE HERE IN BYTE];
        /** The receiving VBInt size for the packet header */
        uint8               packetExpectedVBSize;
        BaseSocket    _socket; 

        uint16 allocatePacketID()
        {
            return ++publishCurrentId;
        }

        Impl(const char * clientID, MessageReceived * callback, const DynamicBinDataView * brokerCert)
             : _socket(YOUR TIMEOUT HERE IN MS), socket(0), brokerCert(brokerCert), clientID(clientID), cb(callback), timeoutMs({3, 0}), lastCommunication(0), publishCurrentId(0), keepAlive(300),
#if MQTTUseUnsubscribe == 1        
               unsubscribeId(0), lastUnsubscribeError(ErrorType::WaitingForResult),
#endif
               recvState(Ready), recvBufferSize(max(callback->maxPacketSize(), 8U)), maxPacketSize(65535), available(0), packetExpectedVBSize(Protocol::MQTT::Common::VBInt(recvBufferSize).getSize())
        {}
        ~Impl() { }

[...And around line 858...]
        void close()
        {
            _socket.close();
            socket = 0;
        }

        bool isOpen()
        {
            return socket;
        }

        int send(const char * buffer, const int size) { return socket ? socket->send(buffer, size) : -1; }

        int connectWith(const char * host, const uint16 port, const bool withTLS)
        {
            if (isOpen()) return -1;
            socket = &_socket;
            return socket ? socket->connect(host, port, brokerCert) : -1;
        }
* Does the library have threading model or single thread execution?

There's no thread used in the library. Your main thread/task call the library and everything is done in the eventLoop function. There's no mutex, no lock, whatsoever.
When used in a single thread execution model, there's nothing to do.
The library isn't re-entrant and because of this, if used in a multithreaded environment you need to protect access to it with your OS's mutex and primitives to avoid multiple thread from corrupting the internal state of the library.

* How are the exception handled? (Exception handling should not throw and end execution)

There is no exception thrown or managed in the library. Exceptions are not used in eMQTT5 since it bloats the application. Every error is returned by the function return type. Either it's an ErrorType (an enum) or a bool (with obvious true = success, false = failure) or a pointer (with nullptr = failure, IIRC).

Even more, there is no RTTI used either. It was made to shrink the application binary size to the absolute minimum possible, so any optional feature of C++ was disabled.

from emqtt5.

nitol-saha avatar nitol-saha commented on September 26, 2024

Thank you so much for the information. It really helped me a lot. As this library supports v5, does this mean than it can also connect to MQTT broker running on v3.1.1?

from emqtt5.

X-Ryl669 avatar X-Ryl669 commented on September 26, 2024

No, the protocol is different and it's not retro-compatible. Most brokers support v5.0 so it's not a big deal IRL. In fact, it's easier to write a MQTTv5 broker than it is for a MQTTv3.0/1.1, since you don't have to deal with so many failure.
For those who don't, they will refuse to connect because the client only claims V5 on first connection and not V3.1.1 and it won't retry, so it's safe and won't break the old network's packet parsers. Supporting both would require adding 60% of binary size and I don't think it's worth it since MQTTv3 is now deprecated.

from emqtt5.

nitol-saha avatar nitol-saha commented on September 26, 2024

Thank you so much for the information.

from emqtt5.

Related Issues (15)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.