Giter Club home page Giter Club logo

nmqtt's Introduction

Native Nim MQTT client library and binaries

This is a hybrid package including a native Nim MQTT library and binaries for a MQTT broker, publisher and subscriber.

Install

You can install this package with Nimble:

$ nimble install nmqtt

or cloning and installing:

$ git clone https://github.com/zevv/nmqtt.git && cd nmqtt
$ nimble install

Binaries

The package provides 4 MQTT binaries:

  1. nmqtt -> Broker
  2. nmqtt_password -> Password utility for the broker
  3. nmqtt_pub -> MQTT publisher
  4. nmqtt_sub -> MQTT subscriber

nmqtt

A default configuration file is provided in config/nmqtt.conf. You can copy and paste this file to a desired location, or run nimble setup nmqtt which will guide you through it.

$ nmqtt --help
nmqtt version 1.0.0

nmqtt is a MQTT v3.1.1 broker

USAGE
  nmqtt [options]
  nmqtt [-c /path/to/config.conf]
  nmqtt [-h hostIP -p port]

CONFIG
  Use the configuration file for detailed settings,
  such as SSL, adjusting keep alive timer, etc. or
  specify options at the command line.

  To add and delete users from the password file
  please use nmqtt_password:
    - nmqtt_password -a|-b|-d [options]

OPTIONS
  -?, --help          print this cligen-erated help
  -c=, --config=      absolute path to the config file. Overrides all other options.
  -h=, --host=        IP-address to serve the broker on.
  -p=, --port=        network port to accept connecting from.
  -v=, --verbosity=   verbosity from 0-3.
  --max-conn=         max simultaneous connections. Defaults to no limit.
  --clientid-maxlen=  max lenght of clientid. Defaults to 65535.
  --clientid-spaces   allow spaces in clientid. Defaults to false.
  --clientid-empty    allow empty clientid and assign random id. Defaults to false.
  --client-kickold    kick old client, if new client has same clientid. Defaults to false.
  --clientid-pass     pass clientid in payload {clientid:payload}. Defaults to false.
  --password-file=    absolute path to the password file
  --ssl               activate ssl for the broker - requires --ssl-cert and --ssl-key.
  --ssl-cert=         absolute path to the ssl certificate.
  --ssl-key=          absolute path to the ssl key.

nmqtt_password

$ nmqtt_password --help
nmqtt_password is a user and password manager for nmqtt
nmqtt_password is based upon nmqtt version 1.0.0

USAGE
  nmqtt_password -a {password_file.conf} {username}
  nmqtt_password -b {password_file.conf} {username} {password}
  nmqtt_password -d {password_file.conf} {username}

CONFIG
  Add or delete users from nmqtt password file.

OPTIONS
  -?, --help     print this cligen-erated help
  -a, --adduser  add a new user to the password file.
  -b, --batch    run in batch mode to allow passing passwords on the command line.
  -d, --deluser  delete a user from the password file.

nmqtt_pub

$ ./nmqtt_pub --help
nmqtt_pub is a MQTT client for publishing messages to a MQTT-broker.
nmqtt_pub is based upon nmqtt version 1.0.0

Usage:
  nmqtt_pub [options] -t {topic} -m {message}
  nmqtt_pub [-h host -p port -u username -P password] -t {topic} -m {message}

OPTIONS
  -?, --help         print this cligen-erated help
  -h=, --host=       IP-address of the broker.
  -p=, --port=       network port to connect too.
  --ssl              use ssl.
  -c=, --clientid=   your connection ID. Defaults to nmqttpub- appended with processID.
  -u=, --username=   provide a username
  -P=, --password=   provide a password
  -t=, --topic=      mqtt topic to publish to.
  -m=, --msg=        message payload to send.
  -q=, --qos=        quality of service level to use for all messages.
  -r, --retain       retain messages on the broker.
  --repeat=          repeat the publish N times.
  --repeatdelay=     if using --repeat, wait N seconds between publish. Defaults to 0.
  --willtopic=       set the will's topic
  --willmsg=         set the will's message
  --willqos=         set the will's quality of service
  --willretain       set to retain the will message
  -v=, --verbosity=  set the verbosity level from 0-2. Defaults to 0.

nmqtt_sub

$ ./nmqtt_sub --help
nmqtt_sub is a MQTT client that will subscribe to a topic on a MQTT-broker.
nmqtt_sub is based upon nmqtt version 1.0.0

Usage:
  nmqtt_sub [options] -t {topic}
  nmqtt_sub [-h host -p port -u username -P password] -t {topic}

OPTIONS
  -?, --help         print this cligen-erated help
  -h=, --host=       IP-address of the broker. Defaults to 127.0.0.1
  -p=, --port=       network port to connect too. Defaults to 1883.
  --ssl              use ssl.
  -c=, --clientid=   your connection ID. Defaults to nmqttsub- appended with processID.
  -u=, --username=   provide a username
  -P=, --password=   provide a password
  -t=, --topic=      MQTT topic to subscribe too. For multipe topics, separate them by comma.
  -q=, --qos=        quality of service level to use for all messages. Defaults to 0.
  -k=, --keepalive=  keep alive in seconds for this client. Defaults to 60.
  --removeretained   clear any retained messages on the topic
  --willtopic=       set the will's topic
  --willmsg=         set the will's message
  --willqos=         set the will's quality of service
  --willretain       set to retain the will message
  -v=, --verbosity=  set the verbosity level from 0-2. Defaults to 0.

Library

This library includes all the needed proc's for publishing MQTT messages to a MQTT-broker and for subscribing to a topic on a MQTT-broker. The library supports QOS 1, 2 and 3 for both publishing and subscribing and sending retained messages.

Examples

Subscribe to topic

import nmqtt, asyncdispatch

let ctx = newMqttCtx("nmqttClient")
ctx.set_host("test.mosquitto.org", 1883)
#ctx.set_auth("username", "password")
#ctx.set_ping_interval(30)
#ctx.set_ssl_certificates("cert.crt", "private.key")

proc mqttSub() {.async.} =
  await ctx.start()
  proc on_data(topic: string, message: string) =
    echo "got ", topic, ": ", message

  await ctx.subscribe("nmqtt", 2, on_data)

asyncCheck mqttSub()
runForever()

Publish msg

proc mqttPub() {.async.} =
  await ctx.start()
  await ctx.publish("nmqtt", "hallo", 2)
  await sleepAsync 500
  await ctx.disconnect()

waitFor mqttPub()

Subscribe and publish

proc mqttSubPub() {.async.} =
  await ctx.start()

  # Callback when receiving on the topic
  proc on_data(topic: string, message: string) =
    echo "got ", topic, ": ", message

  # Subscribe to topic the topic `nmqtt`
  await ctx.subscribe("nmqtt", 2, on_data)
  await sleepAsync 500

  # Publish a message to the topic `nmqtt`
  await ctx.publish("nmqtt", "hallo", 2)
  await sleepAsync 500

  # Disconnect
  await ctx.disconnect()

waitFor mqttSubPub()

Procs

newMqttCtx*

proc newMqttCtx*(clientId: string): MqttCtx =

Initiate a new MQTT client


set_ping_interval*

proc set_ping_interval*(ctx: MqttCtx, txInterval: int) =

Set the clients ping interval in seconds. Default is 60 seconds.


set_ssl_certificates*

proc set_ssl_certificates*(ctx: MqttCtx, sslCert: string, sslKey: string) =

Sets the SSL Certificate and Key files to use Mutual TLS authentication


set_host*

proc set_host*(ctx: MqttCtx, host: string, port: int=1883, sslOn=false) =

Set the MQTT host


set_auth*

proc set_auth*(ctx: MqttCtx, username: string, password: string) =

Set the authentication for the host


set_will*

proc set_will*(ctx: MqttCtx, topic, msg: string, qos=0, retain=false) =

Set the clients will.


connect*

proc connect*(ctx: MqttCtx) {.async.} =

Connect to the broker.


start*

proc start*(ctx: MqttCtx) {.async.} =

Auto-connect and reconnect to the broker. The client will try to reconnect when the state is Disconnected or Error. The Error-state happens, when the broker is down, but the client will try to reconnect until the broker is up again.


disconnect*

proc disconnect*(ctx: MqttCtx) {.async.} =

Disconnect from the broker.


publish*

proc publish*(ctx: MqttCtx, topic: string, message: string, qos=0, retain=false) {.async.} =

Publish a message.

Required:

  • topic: string
  • message: string

Optional:

  • qos: int = 0, 1 or 2
  • retain: bool = true or false

Publish message:

ctx.publish(topic = "nmqtt", message = "Hey there", qos = 0, retain = true)

Remove retained message on topic:

Set the message to null.

ctx.publish(topic = "nmqtt", message = "", qos = 0, retain = true)

subscribe*

proc subscribe*(ctx: MqttCtx, topic: string, qos: int, callback: PubCallback): Future[void] =

Subscribe to a topic

Access the callback with:

proc callbackName(topic: string, message: string) =
  echo "Topic: ", topic, ": ", message

unsubscribe*

proc unsubscribe*(ctx: MqttCtx, topic: string): Future[void] =

Unubscribe from a topic.


isConnected*

proc isConnected*(ctx: MqttCtx): bool =

Returns true, if the client is connected to the broker.


msgQueue*

proc msgQueue*(ctx: MqttCtx): int =

Returns the number of unfinished packages, which still are in the work queue. This includes all publish and subscribe packages, which has not been fully send, acknowledged or completed.

You can use this to ensure, that all your of messages are sent, before exiting your program.


nmqtt's People

Contributors

keslerm avatar lmn avatar ringabout avatar srd424 avatar thomastjdev avatar zevv avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

nmqtt's Issues

The broker quit operation - "Keep alive time overdue".

The broker quit operation after a client disconnects. It happened a few times, but it is not something I can reproduce. Below is all I could catch.

Using nmqtt-1.0.4, Nim 1.6.6, Windows 11

rx> Connect(00): 00 04 4D 51 54 54 04 06 00 3C 00 19 73 68 65 6C 6C 79 70 6C 75 67 2D 73 2D 42 43 46 46 34 44 35 41 32 36 30 37 00 29 73 68 65 6C 6C 69 65 73 2F 73 68 65 6C 6C 79 70 6C 75 67 2D 73 2D 42 43 46 46 34 44 35 41 32 36 30 37 2F 6F 6E 6C 69 6E 65 00 05 66 61 6C 73 65
tx> ConnAck(00): 00 02
Connections >> shellyplug-s-BCFF4D5A2607 was disconnected. Keep alive time overdue.
Client      >> shellyplug-s-BCFF4D5A2607
  host:
  sslOn: false
  sslCert:
  sslKey:
  verbosity: 0
  beenConnected: true
  username:
  password:
  clientId: shellyplug-s-BCFF4D5A2607
  s: ..disconnected..
  msgIdSeq: 3
  workQueue: 0
  pubCallbacks: 0
  inWork: false
  keepAlive: 60
  willFlag: false
  willQoS: 0
  willRetain: false
  willTopic: shellies/shellyplug-s-BCFF4D5A2607/online
  willMsg: false
  proto: MQTT
  version: 4
  connFlags: 00000110
  subscribed: 3

PS C:\Users\xxx>```

BUG: Messages is sent twice

During the implementation of PR #9 , the subscribe() failed when using asyncCheck. Debugging the packages on the broker and using -d:dev on nmqtt shows, that the packages are sent twice (2).

To make a quick workaround, the commit 62e5d8f checks, that the SUBACK package received matches the msgId in the queue - due to deletion of the first send message, which would crash the program when trying to delete the same package again.

The problem is also shown on the publish - which might could be part of issue #7?

Subscribe

Broker

1584256896: Sending CONNACK to hallo (0, 0)
1584256897: Received PINGREQ from hallo
1584256897: Sending PINGRESP to hallo
1584256897: Received SUBSCRIBE from hallo # => HERE
1584256897: 	# (QoS 2)
1584256897: Sending SUBACK to hallo  # => Package received, and msgId removed
1584256897: Received SUBSCRIBE from hallo # => HERE
1584256897: 	# (QoS 2)
1584256897: Sending SUBACK to hallo  # => Crash due to msgId already deleted
1584256898: Received PINGREQ from hallo
1584256898: Sending PINGRESP to hallo
1584256899: Received PINGREQ from hallo

Client

tx> Subscribe(02): 00 01 00 01 23 02 
tx> Subscribe(02): 00 01 00 01 23 02 
rx> PingResp(00): 
rx> SubAck(00): 00 01 02  # => HERE
rx> SubAck(00): 00 01 02  # => HERE
tx> PingReq(00): 
rx> PingResp(00): 
tx> PingReq(00): 

Publish

Broker

1584257384: Received PUBLISH from hallo (d0, q2, r0, m1, 'test1', ... (5 bytes))
1584257384: Sending PUBREC to hallo (m1, rc0)
1584257384: Received PUBLISH from hallo (d0, q2, r0, m1, 'test1', ... (5 bytes))
1584257384: Sending PUBREC to hallo (m1, rc0)
1584257384: Received PUBREL from hallo (Mid: 1)
1584257384: Sending PUBCOMP to hallo (m1)
1584257384: Received PUBREL from hallo (Mid: 1)
1584257384: Sending PUBCOMP to hallo (m1)

Client

tx> Publish(04): 00 05 74 65 73 74 31 00 01 68 61 6C 6C 6F 
tx> Publish(04): 00 05 74 65 73 74 31 00 01 68 61 6C 6C 6F 
rx> PingResp(00): 
rx> PubRec(00): 00 01 
tx> PubRel(02): 00 01 
rx> PubRec(00): 00 01 
tx> PubRel(02): 00 01 
rx> PubComp(00): 00 01 
rx> PubComp(00): 00 01 

Let user define `keep-alive` time

Currently the pings run 1/sec. Let the user define the keep-alive time based its current signal strength or gut-feeling. The maximum keep alive is 18h 12min 15 sec.

proc runPing(ctx: MqttCtx) {.async.} =
  while true:
    await sleepAsync 1000
    let ok = await ctx.sendPingReq()
    if not ok:
      break
    await ctx.work()

Ownership

@ThomasTJdev I currently don't have any need for MQTT, and that is usually not a good thing for my motivation to work on things. If you feel like it, I could transfer ownership of the repo to you or give you commit rights on the master branch?

Packet max size: 128 - increase?

When trying to send a large payload, the send() is going through the await send(), but nothing is received on the MQTT broker. Furthermore this will also block all the next messages, pings, etc.
I think I have located it to be the sizing due to uint8, so when reaching 128 - it breaks.

I have digged into hdr and pkt structure in send(), but everything seems ok. Any suggestions?

Please try sending a payload (topic + msg) on specific 128 and more.

Showcase: GUI for nmqtt_pub

This is just a showcase for a GUI for nmqtt_pub. When the verbose parameter in nmqtt_pub is updated, the output will be much better. Furthermore, a config for previous connection or favorites could be generated for ease of connecting in the GUI (this will not happen in nmqtt library).

https://github.com/ThomasTJdev/nmqttgui

screenshot1

"Subscribe to topic" example does not compile.

Attempting to compile the example given in the readme:

import nmqtt, asyncdispatch

let ctx = newMqttCtx("nmqttClient")
ctx.set_host("test.mosquitto.org", 1883)
#ctx.set_auth("username", "password")
#ctx.set_ping_interval(30)
#ctx.set_ssl_certificates("cert.crt", "private.key")

proc mqttSub() {.async.} =
  await ctx.start()
  proc on_data(topic: string, message: string) =
    echo "got ", topic, ": ", message

  await ctx.subscribe("nmqtt", 2, on_data)

asyncCheck mqttSub
runForever()

as is with a fresh nimble project (on Nim 1.2.4 and with nmqtt 1.0.3) gets me this:

        ... /home/mihara/Projects/testify/src/testify.nim(16, 12) Error: type mismatch: got <proc (): Future[system.void]{.locks: <unknown>.}>
        ... but expected one of: 
        ... proc asyncCheck[T](future: Future[T])
        ...   first type mismatch at position: 1
        ...   required type for future: Future[asyncCheck.T]
        ...   but expression 'mqttSub' is of type: proc (): Future[system.void]{.locks: <unknown>.}
        ... expression: asyncCheck mqttSub

Now, I will be the first person to admit I don't know heads or tails of how asynchronous procedures work in nim, being new to the language, but this error message doesn't seem very helpful. After much mucking around, I've found that the error message means that the compiler has decided that the mqttSub proc is for whatever reason not GC-safe. That, however, is where I'm stuck from there on: I don't know how to make it GC-safe, and whatever is making it not-GC-safe appears to be caused by a macro. Which I still haven't figured out how to debug.

I can of course write discard mqttSub() instead, and it compiles (and even works, if I put my MQTT server in), but the Nim manual warns against doing that specifically. I'm probably missing something obvious, but in any case, I think the example should be corrected.

connect() does not raise exception on error

I've got a "one-shot" program that publishes a few values then quits, so I'm calling connect() instead of start(), but unfortunately this doesn't seem to raise an exception if the TCP connection fails.

I contemplated using isConnected() to check, but I think it's possible for TCP connection to have completed OK but for the MQTT handshake not to have done, in which case the state won't be "connected"` yet?

License?

We don't have an explicit LICENSE file yet

Long running process exits with IOSelectorsException

I have a program which runs for a long time, every once in a time sending something over MQTT. Recently I got a crash with this error message:

  /home/peter/.choosenim/toolchains/nim-2.0.0/lib/pure/asyncdispatch.nim(2022) waitFor
  /home/peter/.choosenim/toolchains/nim-2.0.0/lib/pure/asyncdispatch.nim(1711) poll
  /home/peter/.choosenim/toolchains/nim-2.0.0/lib/pure/asyncdispatch.nim(1452) runOnce
  /home/peter/.choosenim/toolchains/nim-2.0.0/lib/pure/asyncdispatch.nim(269) processPendingCallbacks
  /home/peter/.nimble/pkgs2/nmqtt-1.0.5-e9b66114ab9f39559f01e200164bfff921c0c8f3/nmqtt.nim(1094) runConnect (Async)
  /home/peter/.nimble/pkgs2/nmqtt-1.0.5-e9b66114ab9f39559f01e200164bfff921c0c8f3/nmqtt.nim(1066) connectBroker (Async)
  /home/peter/.choosenim/toolchains/nim-2.0.0/lib/pure/asyncnet.nim(296) dial (Async)
  /home/peter/.choosenim/toolchains/nim-2.0.0/lib/pure/asyncdispatch.nim(1902) dial
  /home/peter/.choosenim/toolchains/nim-2.0.0/lib/pure/asyncdispatch.nim(1868) tryNextAddrInfo
  /home/peter/.choosenim/toolchains/nim-2.0.0/lib/pure/asyncdispatch.nim(1733) createAsyncNativeSocket
  /home/peter/.choosenim/toolchains/nim-2.0.0/lib/pure/asyncdispatch.nim(1247) register
  /home/peter/.choosenim/toolchains/nim-2.0.0/lib/pure/ioselects/ioselectors_epoll.nim(136) registerHandle
  /home/peter/.choosenim/toolchains/nim-2.0.0/lib/pure/selectors.nim(287) raiseIOSelectorsError
Exception message: Maximum number of descriptors is exhausted!
 [IOSelectorsException]

Might be something else in my program which leaks descriptors, but figured I'd report it here first since the error was triggered in nmqtt. The MqttContext lives for the lifetime of the program, and the messages are sent wrapped in a start/disconnect construct (but not in a try/finally one).

Reconnect after the broker has been down

Auto-reconnect

I ran into a problem yesterday, when my broker went offline. nmqtt tried to reconnect with runConnect(), but due the host being down, an OSError was raised and ctx.state = Error.
After that my client ran the runConnect()-loop each second, but due to state being Error, it never reconnected.

Let's imagine you are having 100's of IOT-clients, the broker has to be maintained and restarted. Now each of the 100 clients needs to restarted to reconnect.

Possible solutions

I would like to propose, that nmqtt provides a solution for the above scenario. I would prefer something like #Sugg1, where I dont have to implement other procs or templates to check.

Sugg1

An option for reconnect X-times or Y-periode with Z-interval. The example below has 2 reconnect states, the normal for Disconnected and an Error-option.

type
  MqttCtx* = ref object
    reconnect: seq[ReconnectState]
    reconnectInterval: int # ms

  ReconnectState = enum
    Disconnected, Error


proc runConnect(ctx: MqttCtx) {.async.} =
  while true:
    if ctx.state in ctx.reconnect:
      # Reconnect
    await sleepAsync(ctx.reconnectInterval * 1000)


proc set_reconnect*(ctx: MqttCtx, states: varargs[State] = @[Disconnected]) =
  for state in states:
    ctx.reconnect.add(state)

proc set_reconnect_interval*(ctx: MqttCtx, interval: int = 1) =
  ctx.reconnectInterval = interval

ctx.set_reconnect(Disconnected, Error)
ctx.set_reconnect_interval(10) # sec
await publish(xxx)

Sugg2

Implementing an isConnected*(), which the user can call to check. When the result is False, the user can connect/start again. This would require us to break the while true inside runConnect(), to avoid spawing more.

if not ctx.isConnected():
  ctx.start()
await publish(xxx)

Sugg3

Making MqttCtx.state and State.Error as a global wheres the user can implement their own checking, e.g.:

if ctx.state = Error:
  ctx.start()
await publish(xxx)

Add github topics to the repo

Hi @zevv

Will you add some topics (tags) beneath the repo title? E.g. mqtt, mqtt-client, mqtt-server, mqtt-broker, mqtt-library.

Thank you.

Publish 100 msg with qos=2 fails

Issue:
When publishing multiple messages (100'ish) with qos=2, not all messages are sent. This can be confirmed, when checking the length of ctx.workQueue or subscribing to the topic with Mosquitto. There's no consistency in the messages, which are not send.

Possible solutions:
First thought is, that this is a blocking in handling the package and the order of this. Needs debugging in nmqtt and check of package order in broker.

Test suite:
This currently fails in test "publish multiple message fast qos=2"

Example:
Example for publishing - monitor your broker.

import nmqtt, asyncdispatch
let ctx = newMqttCtx("hallopub")
ctx.set_host("127.0.0.1", 1883)

proc conn() {.async.} =
  await ctx.start()
  var msg: int
  for i in 1 .. 100:
    await ctx.publish("test1", $msg, 2)
    msg += 1

asyncCheck conn()
runForever()

Automatic connect/reconnect

My original idea was to make the lib automatically handle the session connect and reconnect without bothering the user/application. This would allow for fire-and-forget, which was also the reason for start() to just do an async connect and queue all the work.

With the recent changes start() now acts as a connect() and requires the user to wait for the connection - that was the reason for the loop with the async sleep in runConnect()

Making start() wait is not a bad thing because the application has more control over the connection setup, but how should the lib now handle automatic reconnection on disconnect? Will every publish() fail when the underlying connection has gone? Or should we queue them like now and send as soon as the connection is restored?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.