Comments (12)
Sure:
// Usual programs
#include <stdio.h>
#include <stdlib.h>
// We need MQTT client
#include "Network/Clients/MQTT.hpp"
struct MessageReceiver : public Network::Client::MessageReceived
{
void messageReceived(const Network::Client::MQTTv5::DynamicStringView & topic, const Network::Client::MQTTv5::DynamicBinDataView & payload,
const uint16 packetIdentifier, const Network::Client::MQTTv5::PropertiesView & properties)
{
fprintf(stdout, "Msg received: (%04X)\n", packetIdentifier);
fprintf(stdout, " Topic: %.*s\n", topic.length, topic.data);
fprintf(stdout, " Payload: %.*s\n", payload.length, payload.data);
}
};
Network::Client::MQTTv5::QoSDelivery QoS = Network::Client::MQTTv5::QoSDelivery::AtMostOne;
volatile bool cont = true;
void ctrlc(int sig)
{
if (sig == SIGINT) cont = false;
}
int main(int argc, const char ** argv)
{
const char * server = "yourserver.com";
const uint16 port = 1883;
const char * username = "bob";
const char * password = "sponge";
const char * subscribe = "/topic/to/subscribe/to";
const char * publishTopic = "/topic/to/publish/to";
const char * publishMessage = "I'm alive... again";
unsigned keepAlive = 300;
MessageReceiver receiver;
Network::Client::MQTTv5 client(clientID, &receiver);
Network::Client::MQTTv5::DynamicBinDataView pw(strlen(password), (const uint8*)password);
if (Network::Client::MQTTv5::ErrorType ret = client.connectTo(server, port, false,
(uint16)min(65535U, keepAlive), true, username, &pw))
{
return fprintf(stderr, "Failed connection to %s with error: %d\n", (const char*)serverURL.asText(), (int)ret);
}
printf("Connected to %s\n", (const char*)serverURL.asText());
// Check if we have some subscription
if (Network::Client::MQTTv5::ErrorType ret = client.subscribe(subscribe, Protocol::MQTT::V5::GetRetainedMessageAtSubscriptionTime, true, QoS, false))
{
return fprintf(stderr, "Failed subscribing to %s with error: %d\n", (const char*)subscribe, (int)ret);
}
printf("Subscribed to %s\nWaiting for messages...\n", (const char*)subscribe);
// Then enter the event loop here
signal(SIGINT, ctrlc);
time_t last = time(NULL);
while (cont)
{
if (Network::Client::MQTTv5::ErrorType ret = client.eventLoop())
return fprintf(stderr, "Event loop failed with error: %d\n", (int)ret);
// Is it time to publish ?
if (time(NULL) - last >= 1) {
if (Network::Client::MQTTv5::ErrorType ret = client.publish(publishTopic, publishMessage, strlen(publishMessage), false, QoS)) {
return fprintf(stderr, "Failed publishing %s to %s with error: %d\n", publishMessage, publishTopic, (int)ret);
}
printf("Published %s to %s\n", publishMessage, publishTopic);
last = time(NULL);
}
}
return 0;
}
This is more or less the same as in the test client file MQTTc.cpp. I've removed all utility code so it's as simple as it can be. I haven't tested the code, but you get the idea...
from emqtt5.
Thanks! I will run the code today and will let you know if I face any issues.
from emqtt5.
Hello, I was able to run the code. I modified the code a little bit. I have one question. In the modified code below whenever I am getting a data in subscriber topic, I am publishing the timestamp. I am checking the echoCheck
variable and if I receive any message, I am publishing timestamp in while (cont)
loop. Is this the correct way of doing it? or can I publish in the messageReceived
function? If possible can you guide me how to implement it? My concern is as I am running a infinte while loop and checking the echoCheck
variable continuously it will increase the CPU usage.
#include <stdio.h>
#include <stdlib.h>
#include <ctime>
#include <signal.h>
// We need MQTT client
#include "Network/Clients/MQTT.hpp"
int echoCheck=0;
char* data;
struct MessageReceiver : public Network::Client::MessageReceived
{
void messageReceived(const Network::Client::MQTTv5::DynamicStringView& topic,
const Network::Client::MQTTv5::DynamicBinDataView& payload,
const uint16_t packetIdentifier,
const Network::Client::MQTTv5::PropertiesView& properties)
{
fprintf(stdout, "Msg received: (%04X)\n", packetIdentifier);
fprintf(stdout, " Topic: %.*s\n", topic.length, topic.data);
fprintf(stdout, " Payload: %.*s\n", payload.length, payload.data);
echoCheck=1;
}
};
Network::Client::MQTTv5::QoSDelivery QoS = Network::Client::MQTTv5::QoSDelivery::AtMostOne;
volatile bool cont = true;
void ctrlc(int sig)
{
if (sig == SIGINT)
cont = false;
}
int main(int argc, const char** argv)
{
const char* server = "localhost";
const uint16_t port = 1883;
const char* username = "bob1";
const char* password = "sponge1";
const char* subscribe = "topic1";
const char* publishTopic = "topic2";
const char* clientID = "Client2";
unsigned keepAlive = 300;
MessageReceiver receiver;
Network::Client::MQTTv5 client(clientID, &receiver);
Network::Client::MQTTv5::DynamicBinDataView pw(strlen(password), (const uint8_t*)password);
if (Network::Client::MQTTv5::ErrorType ret = client.connectTo(server, port, false,
(uint16_t)std::min(65535U, keepAlive), true,
username, &pw))
{
return fprintf(stderr, "Failed connection to %s with error: %d\n", server, (int)ret);
}
printf("Connected to %s\n", server);
// Check if we have some subscription
if (Network::Client::MQTTv5::ErrorType ret =
client.subscribe(subscribe, Protocol::MQTT::V5::GetRetainedMessageAtSubscriptionTime, true, QoS, false))
{
return fprintf(stderr, "Failed subscribing to %s with error: %d\n", subscribe, (int)ret);
}
printf("Subscribed to %s\nWaiting for messages...\n", subscribe);
// Then enter the event loop here
signal(SIGINT, ctrlc);
time_t lastPublishTime = time(NULL);
while (cont)
{
if (Network::Client::MQTTv5::ErrorType ret = client.eventLoop())
return fprintf(stderr, "Event loop failed with error: %d\n", (int)ret);
// Check if it's time to publish (every second)
time_t currentTime = time(NULL);
if (echoCheck == 1)
{
// Convert current time to string format
char timestamp[20];
strftime(timestamp, sizeof(timestamp), "%Y-%m-%d %H:%M:%S", localtime(¤tTime));
// Publish the timestamp
if (Network::Client::MQTTv5::ErrorType ret =
client.publish(publishTopic, (const uint8_t*)timestamp, strlen(timestamp), false, QoS))
{
return fprintf(stderr, "Failed publishing %s to %s with error: %d\n", timestamp, publishTopic,
(int)ret);
}
printf("Published %s to %s\n", timestamp, publishTopic);
lastPublishTime = currentTime;
echoCheck=0;
}
}
return 0;
}
from emqtt5.
There's no notion of threads in eMQTT5. This means that MessageReceiver::messageReceived
is called inside the eventLoop
method.
So yes, you can publish in the messageReceived
method.
The while loop isn't an infinite loop either because inside the eventLoop
there's a call to select
(or epoll
or kevent
depending on the platform) for the MQTT socket. This gives the control to the kernel, and the kernel will sleep the CPU for the default timeout period (you can change with client.setDefaultTimeout
) until there's an event on the socket, not wasting CPU resources.
On a microcontroller like the ESP32, this also works this way, the application CPU is waiting on a FreeRTOS' semaphore that will only be signaled when a WIFI packet is received, not burning CPU cycles. Yet, on such micro-controller, because the WIFI hardware device must listen to messages, it can't be put to sleep, so you'll get the usual WIFI consumption of 160mA.
from emqtt5.
Thank you for the information. I have another question. On paho there is on_publish callback to check the message has been published or not.
On_publish Callback
When the message has been published to the Broker an acknowledgement is sent that results in the on_publish callback being called.
Is there any similar thing in the eMQTT5 library? I have only seen messageReceived
method.
from emqtt5.
Depends on what you expect. If you want to make sure the message is received by the broker, you can set the QoS to AtLeastOne
that's QoS=1 or ExactlyOne
that's QoS=2. Any missing ACK will return an error from publish
method.
If you are paranoid that the broker is acting as expected, you can also subscribe to the same topic you're publishing to and set the withAutoFeedback
parameter to true when subscribing so you'll receive your own message.
If you want to make sure the message is just sent, then you'll check the return code of the publish
method. If it's 0 (or Success) than you are sure the message was sent. If it's anything other than that, there was a failure sending or creating the message.
When you call publish
, the whole publish cycle is performed so it either result in a successful publishing or a failed publishing, with no intermediate or transcient state
from emqtt5.
Thanks for the information! I am getting CPU usage very low for my use case.
from emqtt5.
I have some general questions about the library to find compatibility for my project. If you can answer the following questions that would be really helpful:
-
Is the library compatible or can be made compatible with ACE (Adaptive Communication Environment) (http://www.dre.vanderbilt.edu/~schmidt/ACE-overview.html)?
-
Is there any dynamic memory allocation (new, delete)?
-
Does the library have threading model or single thread execution?
-
How are the exception handled? (Exception handling should not throw and end execution)
from emqtt5.
* Is the library compatible or can be made compatible with ACE (Adaptive Communication Environment) (http://www.dre.vanderbilt.edu/~schmidt/ACE-overview.html)?
There is no such thing as ACE in the library. It can be made compatible with it, but you'll need to write a wrapper. Since the library use the minimum possible dependencies, there's almost nothing to wrap (no thread, no filesystem, no OS, etc...), only the socket code, so it shouldn't be hard I think. Please have a look at esp-eMQTT5 port for an example wrapping for FreeRTOS/Lwip, it's ~100 lines of code.
* Is there any dynamic memory allocation (new, delete)?
No, see here. There's only one heap allocation for the socket and the single network buffer when creating the MQTTClient
instance, but you can substitute it for a stack based buffer instead if you need so (change the line 695 of MQTTClient.cpp to look like this:
/** The receiving data buffer */
uint8 recvBuffer[YOUR MAX EXPECTED PACKET SIZE HERE IN BYTE];
/** The receiving VBInt size for the packet header */
uint8 packetExpectedVBSize;
BaseSocket _socket;
uint16 allocatePacketID()
{
return ++publishCurrentId;
}
Impl(const char * clientID, MessageReceived * callback, const DynamicBinDataView * brokerCert)
: _socket(YOUR TIMEOUT HERE IN MS), socket(0), brokerCert(brokerCert), clientID(clientID), cb(callback), timeoutMs({3, 0}), lastCommunication(0), publishCurrentId(0), keepAlive(300),
#if MQTTUseUnsubscribe == 1
unsubscribeId(0), lastUnsubscribeError(ErrorType::WaitingForResult),
#endif
recvState(Ready), recvBufferSize(max(callback->maxPacketSize(), 8U)), maxPacketSize(65535), available(0), packetExpectedVBSize(Protocol::MQTT::Common::VBInt(recvBufferSize).getSize())
{}
~Impl() { }
[...And around line 858...]
void close()
{
_socket.close();
socket = 0;
}
bool isOpen()
{
return socket;
}
int send(const char * buffer, const int size) { return socket ? socket->send(buffer, size) : -1; }
int connectWith(const char * host, const uint16 port, const bool withTLS)
{
if (isOpen()) return -1;
socket = &_socket;
return socket ? socket->connect(host, port, brokerCert) : -1;
}
* Does the library have threading model or single thread execution?
There's no thread used in the library. Your main thread/task call the library and everything is done in the eventLoop
function. There's no mutex, no lock, whatsoever.
When used in a single thread execution model, there's nothing to do.
The library isn't re-entrant and because of this, if used in a multithreaded environment you need to protect access to it with your OS's mutex and primitives to avoid multiple thread from corrupting the internal state of the library.
* How are the exception handled? (Exception handling should not throw and end execution)
There is no exception thrown or managed in the library. Exceptions are not used in eMQTT5 since it bloats the application. Every error is returned by the function return type. Either it's an ErrorType
(an enum) or a bool
(with obvious true = success, false = failure) or a pointer (with nullptr = failure, IIRC).
Even more, there is no RTTI used either. It was made to shrink the application binary size to the absolute minimum possible, so any optional feature of C++ was disabled.
from emqtt5.
Thank you so much for the information. It really helped me a lot. As this library supports v5, does this mean than it can also connect to MQTT broker running on v3.1.1?
from emqtt5.
No, the protocol is different and it's not retro-compatible. Most brokers support v5.0 so it's not a big deal IRL. In fact, it's easier to write a MQTTv5 broker than it is for a MQTTv3.0/1.1, since you don't have to deal with so many failure.
For those who don't, they will refuse to connect because the client only claims V5 on first connection and not V3.1.1 and it won't retry, so it's safe and won't break the old network's packet parsers. Supporting both would require adding 60% of binary size and I don't think it's worth it since MQTTv3 is now deprecated.
from emqtt5.
Thank you so much for the information.
from emqtt5.
Related Issues (15)
- Cannot build with MQTTOnlyBSDSocket set to 0 [Windows build] HOT 7
- Problems including the library HOT 11
- Porting to Renesas MCU without OS HOT 1
- execinfo.h does not exist in alpine linux 3.19 HOT 2
- read properties of received packet HOT 9
- readFrom() return BadData for 4-byte buffer HOT 2
- Linux specific int types HOT 3
- Adding properties to PUBLISH packet. HOT 4
- missing functions getEthernetRate and getWIFIRate HOT 9
- eMQTT5 does not install at all HOT 1
- There seems no definition of Platform::free and so on for macOS. HOT 18
- Clean source dependency for readability HOT 6
- No support for unsubscribing HOT 4
- Stuck in eventLoop HOT 18
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from emqtt5.