Giter Club home page Giter Club logo

gmq's Introduction

GMQ - Pure Go MQTT Client

wercker status Build status Coverage Status GoDoc ![Gitter](https://badges.gitter.im/Join Chat.svg)

Overview

GMQ is a pure Go MQTT client. This library is compatible with MQTT Version 3.1.1. This library provides both a Go package and a command line application.

Installation

$ go get -u github.com/yosssi/gmq/...

MQTT Client Go Package

Example

package main

import (
	"fmt"
	"os"
	"os/signal"

	"github.com/yosssi/gmq/mqtt"
	"github.com/yosssi/gmq/mqtt/client"
)

func main() {
	// Set up channel on which to send signal notifications.
	sigc := make(chan os.Signal, 1)
	signal.Notify(sigc, os.Interrupt, os.Kill)

	// Create an MQTT Client.
	cli := client.New(&client.Options{
		// Define the processing of the error handler.
		ErrorHandler: func(err error) {
			fmt.Println(err)
		},
	})

	// Terminate the Client.
	defer cli.Terminate()

	// Connect to the MQTT Server.
	err := cli.Connect(&client.ConnectOptions{
		Network:  "tcp",
		Address:  "iot.eclipse.org:1883",
		ClientID: []byte("example-client"),
	})
	if err != nil {
		panic(err)
	}

	// Subscribe to topics.
	err = cli.Subscribe(&client.SubscribeOptions{
		SubReqs: []*client.SubReq{
			&client.SubReq{
				TopicFilter: []byte("foo"),
				QoS:         mqtt.QoS0,
				// Define the processing of the message handler.
				Handler: func(topicName, message []byte) {
					fmt.Println(string(topicName), string(message))
				},
			},
			&client.SubReq{
				TopicFilter: []byte("bar/#"),
				QoS:         mqtt.QoS1,
				Handler: func(topicName, message []byte) {
					fmt.Println(string(topicName), string(message))
				},
			},
		},
	})
	if err != nil {
		panic(err)
	}

	// Publish a message.
	err = cli.Publish(&client.PublishOptions{
		QoS:       mqtt.QoS0,
		TopicName: []byte("bar/baz"),
		Message:   []byte("testMessage"),
	})
	if err != nil {
		panic(err)
	}

	// Unsubscribe from topics.
	err = cli.Unsubscribe(&client.UnsubscribeOptions{
		TopicFilters: [][]byte{
			[]byte("foo"),
		},
	})
	if err != nil {
		panic(err)
	}

	// Wait for receiving a signal.
	<-sigc

	// Disconnect the Network Connection.
	if err := cli.Disconnect(); err != nil {
		panic(err)
	}
}

Details about APIs

CONNECT – Client requests a connection to a Server

// Create an MQTT Client.
cli := client.New(&client.Options{
	ErrorHandler: func(err error) {
		fmt.Println(err)
	},
})

// Terminate the Client.
defer cli.Terminate()

// Connect to the MQTT Server.
err := cli.Connect(&client.ConnectOptions{
	// Network is the network on which the Client connects to.
	Network:         "tcp",
	// Address is the address which the Client connects to.
	Address:         "iot.eclipse.org:1883",
	// TLSConfig is the configuration for the TLS connection.
	// If this property is not nil, the Client tries to use TLS
	// for the connection.
	TLSConfig:       nil,
	// CONNACKTimeout is timeout in seconds for the Client
	// to wait for receiving the CONNACK Packet after sending
	// the CONNECT Packet.
	CONNACKTimeout:  10,
	// PINGRESPTimeout is timeout in seconds for the Client
	// to wait for receiving the PINGRESP Packet after sending
	// the PINGREQ Packet.
	PINGRESPTimeout: 10,
	// ClientID is the Client Identifier of the CONNECT Packet.
	ClientID:        []byte("clientID"),
	// UserName is the User Name of the CONNECT Packet.
	UserName:        []byte("userName"),
	// // Password is the Password of the CONNECT Packet.
	Password:        []byte("password"),
	// CleanSession is the Clean Session of the CONNECT Packet.
	CleanSession:    true,
	// KeepAlive is the Keep Alive of the CONNECT Packet.
	KeepAlive:       30,
	// WillTopic is the Will Topic of the CONNECT Packet.
	WillTopic:       []byte("willTopic"),
	// WillMessage is the Will Message of the CONNECT Packet.
	WillMessage:     []byte("willMessage"),
	// WillQoS is the Will QoS of the CONNECT Packet.
	WillQoS:         mqtt.QoS0,
	// WillRetain is the Will Retain of the CONNECT Packet.
	WillRetain:      true,
})
if err != nil {
	panic(err)
}

CONNECT using TLS

// Create an MQTT Client.
cli := client.New(&client.Options{
	ErrorHandler: func(err error) {
		fmt.Println(err)
	},
})

// Terminate the Client.
defer cli.Terminate()

// Read the certificate file.
b, err := ioutil.ReadFile("/path/to/crtFile")
if err != nil {
	panic(err)
}

roots := x509.NewCertPool()
if ok := roots.AppendCertsFromPEM(b); !ok {
	panic("failed to parse root certificate")
}

tlsConfig = &tls.Config{
	RootCAs: roots,
}

// Connect to the MQTT Server using TLS.
err := cli.Connect(&client.ConnectOptions{
	// Network is the network on which the Client connects to.
	Network:         "tcp",
	// Address is the address which the Client connects to.
	Address:         "iot.eclipse.org:1883",
	// TLSConfig is the configuration for the TLS connection.
	// If this property is not nil, the Client tries to use TLS
	// for the connection.
	TLSConfig:       tlsConfig,
})
if err != nil {
	panic(err)
}

SUBSCRIBE - Subscribe to topics

// Create an MQTT Client.
cli := client.New(&client.Options{
	ErrorHandler: func(err error) {
		fmt.Println(err)
	},
})

// Terminate the Client.
defer cli.Terminate()

// Subscribe to topics.
err = cli.Subscribe(&client.SubscribeOptions{
	SubReqs: []*client.SubReq{
		&client.SubReq{
			// TopicFilter is the Topic Filter of the Subscription.
			TopicFilter: []byte("foo"),
			// QoS is the requsting QoS.
			QoS:         mqtt.QoS0,
			// Handler is the handler which handles the Application Message
			// sent from the Server.
			Handler: func(topicName, message []byte) {
				fmt.Println(string(topicName), string(message))
			},
		},
		&client.SubReq{
			TopicFilter: []byte("bar/#"),
			QoS:         mqtt.QoS1,
			Handler: func(topicName, message []byte) {
				fmt.Println(string(topicName), string(message))
			},
		},
	},
})
if err != nil {
	panic(err)
}

PUBLISH – Publish message

// Create an MQTT Client.
cli := client.New(&client.Options{
	ErrorHandler: func(err error) {
		fmt.Println(err)
	},
})

// Terminate the Client.
defer cli.Terminate()

// Publish a message.
err = cli.Publish(&client.PublishOptions{
	// QoS is the QoS of the PUBLISH Packet.
	QoS:       mqtt.QoS0,
	// Retain is the Retain of the PUBLISH Packet.
	Retain:    true,
	// TopicName is the Topic Name of the PUBLISH Packet.
	TopicName: []byte("bar/baz"),
	// Message is the Application Message of the PUBLISH Packet.
	Message:   []byte("testMessage"),
})
if err != nil {
	panic(err)
}

UNSUBSCRIBE – Unsubscribe from topics

// Create an MQTT Client.
cli := client.New(&client.Options{
	ErrorHandler: func(err error) {
		fmt.Println(err)
	},
})

// Terminate the Client.
defer cli.Terminate()

// Unsubscribe from topics.
err = cli.Unsubscribe(&client.UnsubscribeOptions{
	// TopicFilters represents a slice of the Topic Filters.
	TopicFilters: [][]byte{
		[]byte("foo"),
	},
})
if err != nil {
	panic(err)
}

DISCONNECT – Disconnect the Network Connection

// Create an MQTT Client.
cli := client.New(&client.Options{
	ErrorHandler: func(err error) {
		fmt.Println(err)
	},
})

// Terminate the Client.
defer cli.Terminate()

// Disconnect the network connection.
if err := cli.Disconnect(); err != nil {
	panic(err)
}

MQTT Client Command Line Application

After the installation, you can launch an MQTT client command line application by executing the gmq-cli command.

$ gmq-cli
gmq-cli>

You can see all GMQ Client commands by executing the help GMQ Client command.

gmq-cli> help
GMQ Client 0.0.1
Usage:
conn     establish a Network Connection and send a CONNECT Packet to the Server
disconn  send a DISCONNECT Packet to the Server and disconnect the Network Connection
help     print this help message
pub      send a PUBLISH Packet to the Server
quit     quit this process
sub      send a SUBSCRIBE Packet to the Server
unsub    send a UNSUBSCRIBE Packet to the Server

You can see all flags of a GMQ Client command by executing the command with the -help flag.

gmq-cli> conn -help
Usage:
  -P="": Password
  -c=true: Clean Session
  -crt="": the path of the certificate authority file to verify the server connection
  -ct=30: Timeout in seconds for the Client to wait for receiving the CONNACK Packet after sending the CONNECT Packet
  -h="localhost": host name of the Server which the Client connects to
  -i="": Client identifier for the Client
  -k=60: Keep Alive measured in seconds
  -n="tcp": network on which the Client connects to the Server
  -p=1883: port number of the Server which the Client connects to
  -pt=30: Timeout in seconds for the Client to wait for receiving the PINGRESP Packet after sending the PINGREQ Packet
  -u="": User Name
  -wm="": Will Message
  -wq=0: Will QoS
  -wr=false: Will Retain
  -wt="": Will Topic

gmq's People

Contributors

62che avatar kjmrknsn avatar ridha avatar yosssi avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

gmq's Issues

How to reliably wait for a message to get sent?

I have a simple application that just sends one single message and then terminates:

func main() {
	cli = ...

	err = cli.Publish(&client.PublishOptions{
		QoS: mqtt.QoS2,
		TopicName: []byte("foo/bar"),
		Message:   []byte("test"),
	})
	if err != nil {
		log.Fatalf("publish failed: %v", err)
	}
}

Unfortunately the message never gets sent although Publish returns nil. The reason for this behavior is the asynchronous message processing within the library.

Is there any way to reliably wait for a message to get sent?

I could imagine that initializing the cli.conn.send channel with a capacity of 0 (ie. setting sendBufSize = 0) could resolve the issue but I wanted to ask here first whether this is the right approach.

API provides no means to see retain flag on received messages

After a client subscribes, sometimes the first message it receives on a topic is a retained message. Such messages might be old and the client may not wish to act upon such a message so it is useful to be able to distinguish this case. The protocol is such that messages published by the server to the client only have the retain flag set if the message is such a message (that is even if the client that published it set the retain flag the server resets it when it is published if it is sent straightaway.) There is currently no way to distinguish these messages as the handler interface doesn't seem to expose the retain flag.

Not sure if there is a good way to add this while retaining backward compatibility sadly.

websocket transport

Just wanted to get your thoughts on providing a websocket transport, or possibly HTTP/2 in the future.

Cluster addresses

Hi, and thanks for a great library. I have been using it with great success.

I am wondering however, if this library supports cluster connections like so:

"iot-01.eclipse.org:1883,iot-02.eclipse.org:1883,iot-03.eclipse.org:1883"

With the ability to connect to an alternate node if the current node becomes unavailable?

Disconnected by AWS IoT MQTT Broker

Dropping the TLS example almost line-by-line into a test program for AWS IoT caused the server to close the connection when the first subscribe packet is sent.

When I put a time.Sleep(time.Second) in between the cli.Connect and cli.Subscribe calls, then it works just fine. It seems that AWS will close the connection if the subscribe comes too quickly. It seems like the key is to wait until that CONNACK packet is received before continuing with the other messages.

Can we add some sort of event or callback for CONNACK? Like a ConnectedHandler or maybe a buffered channel that sends connection lifecycle events that the main program can read?

SubACK failure not reported to toplevel.

Hello,
I have a use case where it is required to me to have topics restricted by ACLs.
In one of such scenarios; I had serious problem of debug, because when a subscription fails, it does not report this event to the toplevel.
To facilitate my usage of this library I modified the client.go file as follow

var (
     	ErrAlreadyConnected = errors.New("the Client has already connected to the Server")
        ErrNotYetConnected  = errors.New("the Client has not yet connected to the Server")
        ErrCONNACKTimeout   = errors.New("the CONNACK Packet was not received within a reasonalbe amount of time")
        ErrPINGRESPTimeout  = errors.New("the PINGRESP Packet was not received within a reasonalbe amount of time")
        ErrPacketIDExhaused = errors.New("Packet Identifiers are exhausted")
        ErrInvalidPINGRESP  = errors.New("invalid PINGRESP Packet")
        ErrInvalidSUBACK    = errors.New("invalid SUBACK Packet")  //added to report suback failed event
        ErrDeniedSUBACK     = errors.New("Subdenied")
)




// handleSUBACK handles the SUBACK Packet.
func (cli *Client) handleSUBACK(p packet.Packet) error {
        // Lock for update.
        cli.muConn.Lock()
        cli.muSess.Lock()

        // Unlock.
        defer cli.muConn.Unlock()
        defer cli.muSess.Unlock()

        // Extract the Packet Identifier of the Packet.
        id := p.(*packet.SUBACK).PacketID

        // Validate the Packet Identifier.
        if err := cli.validatePacketID(cli.sess.sendingPackets, id, packet.TypeSUBSCRIBE); err != nil {
                return err
        }

	// Get the subscription requests of the SUBSCRIBE Packet.
        subreqs := cli.sess.sendingPackets[id].(*packet.SUBSCRIBE).SubReqs

        // Delete the SUBSCRIBE Packet from the Session.
        delete(cli.sess.sendingPackets, id)

        // Get the Return Codes of the SUBACK Packet.
        returnCodes := p.(*packet.SUBACK).ReturnCodes

        // Check the lengths of the Return Codes.
        if len(returnCodes) != len(subreqs) {
                return ErrInvalidSUBACK
        }

	// Set the subscriptions to the Network Connection.
        for i, code := range returnCodes {
                // Skip if the Return Code is failure.
                if code == packet.SUBACKRetFailure {
                        return ErrDeniedSUBACK //added to report suback failed event
                        continue
                }

                // Get the Topic Filter.
                topicFilter := string(subreqs[i].TopicFilter)

                // Move the subscription information from
                // unackSubs to ackedSubs.
                cli.conn.ackedSubs[topicFilter] = cli.conn.unackSubs[topicFilter]
                delete(cli.conn.unackSubs, topicFilter)
        }

	return nil
}

is there any reason this condition weren't reported?
Regards.

ErrInvalidPacketID on reconnect

I've got an application that I'd written using gmq that consistently failed to connect to my MQTT server. I had been subscribed to a couple of topics (wildcarded) at QoS2 and was not using clean sessions. Apparently I lost a connection with some messages in flight and on reconnect, the broker was attempting to redeliver lost messages, confusing the client.

I was able to make my application work again by dropping to QoS0 and getting a clean session (probably only the latter mattered), but I didn't notice the loss for several hours, so I can't make a reliable app currently with QoS > 0 and reusing sessions.

Reconnect to server

Just wanted to get your thoughts on the best way to reconnect to an MQTT server using your library.

I noted you have a "handler" for disconnect and wanted to know if you had tested using that to initiate a reconnect?

I am just doing some testing this with this library and at the moment that is the only question.

invalid PINGRESP Packet

Hi.
Thanks for excellent MQTT client!
I use your client together with the broker from hrotti in one application.
After a while after start of my application (in which your client is connected to the broker, subscribes for a topic and waits for messages), the client falls with an error: "invalid PINGRESP Packet"
As I understand, the server for some reason ceases to respond to ping. And how it is possible to make reconnect in case of miss of the response to ping from the broker?
That the client didn't fall, and tried to be reconnected?

Packet Identifiers are exhausted

Hi,

I am using this package for the development of a publisher service. The service just takes a CSV data file and reads it line by line. It then sends a message via mqtt for each line. This works nicely but after some time logrus shows this error message in the log:

ERRO[4733] Error publishing message                      error="Packet Identifiers are exhausted"

What happened here? How can I prevent this? Is the message lost or will the client auto retry sending it?

EDIT: Restarting the service solves the problem, it then works again for some time but after a few minutes it shows the error again.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.