Giter Club home page Giter Club logo

Comments (21)

werbenhu avatar werbenhu commented on May 24, 2024 2

@mrhb6006 @thedevop @mochi-co According to MQTT spec 3.1.2.4 Clean Session , if the client does not expire, we should retain the client's subscriptions upon reconnection. However, I tested Mosquitto, and it does not support this feature. Therefore, I think making this an optional capability is a good idea.

from server.

thedevop avatar thedevop commented on May 24, 2024 1

@werbenhu , we current do retain client's subscription

server/server.go

Lines 495 to 501 in 5058333

for _, sub := range existing.State.Subscriptions.GetAll() {
existed := !s.Topics.Subscribe(cl.ID, sub) // [MQTT-3.8.4-3]
if !existed {
atomic.AddInt64(&s.Info.Subscriptions, 1)
}
cl.State.Subscriptions.Add(sub.Filter, sub)
}

We then clean up the old client:

s.UnsubscribeClient(existing)

But will not unsubscribe topics if the client is taken over.

server/server.go

Lines 1265 to 1267 in 5058333

if atomic.LoadUint32(&cl.State.isTakenOver) == 1 {
return
}

I have tested with Paho client, and it does retain the subscriptions. However, the messages are only stored if QOS > 0.

from server.

mrhb6006 avatar mrhb6006 commented on May 24, 2024

i use this pkg at production, plz help

from server.

werbenhu avatar werbenhu commented on May 24, 2024

@mrhb6006 I'm too busy at the moment. I will take a look at this issue as soon as I have some free time.

from server.

torkamania avatar torkamania commented on May 24, 2024

@mrhb6006 What is the version of the library in your app?

from server.

thedevop avatar thedevop commented on May 24, 2024

@mrhb6006 , in order for the message to be saved, the following conditions need to be met:

  1. Client session is not expired (met based on above)
  2. Client subscribed to the topic with QOS > 0
  3. Message published with QOS > 0

Can you check what QOS was used for subscription and publishing?

from server.

mrhb6006 avatar mrhb6006 commented on May 24, 2024

@thedevop
all conditions are met
when publish with real client it is ok
but when i use inline client it is not

from server.

mrhb6006 avatar mrhb6006 commented on May 24, 2024

@mrhb6006 What is the version of the library in your app?

last version

from server.

mrhb6006 avatar mrhb6006 commented on May 24, 2024

some

tnx
I'm waiting for you

from server.

thedevop avatar thedevop commented on May 24, 2024

Can you elaborate your steps as an inline client would not experience disconnect.

from server.

werbenhu avatar werbenhu commented on May 24, 2024

@mrhb6006 Take a look at #354 to see if it resolves your issue.

from server.

werbenhu avatar werbenhu commented on May 24, 2024

@thedevop The testing environment I'm in right now is indeed, as you said, without any issues. I will conduct further tests when I return to the previous environment in a few days.

from server.

werbenhu avatar werbenhu commented on May 24, 2024

@thedevop you are right. I tested it today, and I couldn't reproduce the issue. It might have been my mistake earlier.

from server.

thedevop avatar thedevop commented on May 24, 2024

@mrhb6006 , can you re-test if a client retains subscription when it disconnect/reconnect?

Can you also re-test the saved messages, make sure the following conditions are met:

  1. Client session is not expired (met based on above)
  2. Client subscribed to the topic with QOS > 0
  3. Message published with QOS > 0

from server.

mrhb6006 avatar mrhb6006 commented on May 24, 2024

@mrhb6006 , can you re-test if a client retains subscription when it disconnect/reconnect?

Can you also re-test the saved messages, make sure the following conditions are met:

  1. Client session is not expired (met based on above)
  2. Client subscribed to the topic with QOS > 0
  3. Message published with QOS > 0

yes all conditions are met.
i use inline client for publish and message not save at session

from server.

mrhb6006 avatar mrhb6006 commented on May 24, 2024

@thedevop you are right. I tested it today, and I couldn't reproduce the issue. It might have been my mistake earlier.

test with inline client publish ?

from server.

werbenhu avatar werbenhu commented on May 24, 2024

@mrhb6006 I will test this scenario later.

from server.

werbenhu avatar werbenhu commented on May 24, 2024

@mrhb6006 I tested it, and it's OK. Client A subscribes to the 'mochi' topic with QoS 1, then disconnects. Client B connects, publishes a message to the 'trigger' topic, triggering an inline publish to send a message. When Client A reconnects, it can receive the content sent by Client B. You can refer to my 'main.go' below for details. The client I used is mqttx.

// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileContributor: mochi-co

package main

import (
	"bytes"
	"flag"
	"log"
	"os"
	"os/signal"
	"syscall"

	mqtt "github.com/mochi-mqtt/server/v2"
	"github.com/mochi-mqtt/server/v2/listeners"
	"github.com/mochi-mqtt/server/v2/packets"
)

// Options contains configuration settings for the hook.
type MyHookOptions struct {
	Server *mqtt.Server
}

// AllowHook is an authentication hook which allows connection access
// for all users and read and write access to all topics.
type MyHook struct {
	mqtt.HookBase
	config *MyHookOptions
}

func (h *MyHook) Init(config any) error {
	h.Log.Info("initialised")
	if _, ok := config.(*MyHookOptions); !ok && config != nil {
		return mqtt.ErrInvalidConfigType
	}

	h.config = config.(*MyHookOptions)
	if h.config.Server == nil {
		return mqtt.ErrInvalidConfigType
	}
	return nil
}

// ID returns the ID of the hook.
func (h *MyHook) ID() string {
	return "allow-all-auth"
}

// Provides indicates which hook methods this hook provides.
func (h *MyHook) Provides(b byte) bool {
	return bytes.Contains([]byte{
		mqtt.OnConnectAuthenticate,
		mqtt.OnACLCheck,
		mqtt.OnPublish,
	}, []byte{b})
}

// OnConnectAuthenticate returns true/allowed for all requests.
func (h *MyHook) OnConnectAuthenticate(cl *mqtt.Client, pk packets.Packet) bool {
	return true
}

// OnACLCheck returns true/allowed for all checks.
func (h *MyHook) OnACLCheck(cl *mqtt.Client, topic string, write bool) bool {
	return true
}

func (h *MyHook) OnPublish(cl *mqtt.Client, pk packets.Packet) (packets.Packet, error) {
	h.Log.Info("received from client", "client", cl.ID, "payload", string(pk.Payload))
	pkx := pk
	if string(pk.TopicName) == "trigger" {
		h.config.Server.Publish("mochi", pk.Payload, false, 1);
	}
	return pkx, nil
}

func main() {
	tcpAddr := flag.String("tcp", ":1883", "network address for TCP listener")
	flag.Parse()

	sigs := make(chan os.Signal, 1)
	done := make(chan bool, 1)
	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
	go func() {
		<-sigs
		done <- true
	}()

	server := mqtt.New(&mqtt.Options{
		InlineClient: true,
	})
	_ = server.AddHook(new(MyHook), &MyHookOptions{
		Server: server,
	})
	
	tcp := listeners.NewTCP("t1", *tcpAddr, nil)
	err := server.AddListener(tcp)
	if err != nil {
		log.Fatal(err)
	}

	go func() {
		err := server.Serve()
		if err != nil {
			log.Fatal(err)
		}
	}()

	<-done
	server.Log.Warn("caught signal, stopping...")
	_ = server.Close()
	server.Log.Info("main.go finished")

}

from server.

mrhb6006 avatar mrhb6006 commented on May 24, 2024

@mrhb6006 I tested it, and it's OK. Client A subscribes to the 'mochi' topic with QoS 1, then disconnects. Client B connects, publishes a message to the 'trigger' topic, triggering an inline publish to send a message. When Client A reconnects, it can receive the content sent by Client B. You can refer to my 'main.go' below for details. The client I used is mqttx.

// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileContributor: mochi-co

package main

import (
	"bytes"
	"flag"
	"log"
	"os"
	"os/signal"
	"syscall"

	mqtt "github.com/mochi-mqtt/server/v2"
	"github.com/mochi-mqtt/server/v2/listeners"
	"github.com/mochi-mqtt/server/v2/packets"
)

// Options contains configuration settings for the hook.
type MyHookOptions struct {
	Server *mqtt.Server
}

// AllowHook is an authentication hook which allows connection access
// for all users and read and write access to all topics.
type MyHook struct {
	mqtt.HookBase
	config *MyHookOptions
}

func (h *MyHook) Init(config any) error {
	h.Log.Info("initialised")
	if _, ok := config.(*MyHookOptions); !ok && config != nil {
		return mqtt.ErrInvalidConfigType
	}

	h.config = config.(*MyHookOptions)
	if h.config.Server == nil {
		return mqtt.ErrInvalidConfigType
	}
	return nil
}

// ID returns the ID of the hook.
func (h *MyHook) ID() string {
	return "allow-all-auth"
}

// Provides indicates which hook methods this hook provides.
func (h *MyHook) Provides(b byte) bool {
	return bytes.Contains([]byte{
		mqtt.OnConnectAuthenticate,
		mqtt.OnACLCheck,
		mqtt.OnPublish,
	}, []byte{b})
}

// OnConnectAuthenticate returns true/allowed for all requests.
func (h *MyHook) OnConnectAuthenticate(cl *mqtt.Client, pk packets.Packet) bool {
	return true
}

// OnACLCheck returns true/allowed for all checks.
func (h *MyHook) OnACLCheck(cl *mqtt.Client, topic string, write bool) bool {
	return true
}

func (h *MyHook) OnPublish(cl *mqtt.Client, pk packets.Packet) (packets.Packet, error) {
	h.Log.Info("received from client", "client", cl.ID, "payload", string(pk.Payload))
	pkx := pk
	if string(pk.TopicName) == "trigger" {
		h.config.Server.Publish("mochi", pk.Payload, false, 1);
	}
	return pkx, nil
}

func main() {
	tcpAddr := flag.String("tcp", ":1883", "network address for TCP listener")
	flag.Parse()

	sigs := make(chan os.Signal, 1)
	done := make(chan bool, 1)
	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
	go func() {
		<-sigs
		done <- true
	}()

	server := mqtt.New(&mqtt.Options{
		InlineClient: true,
	})
	_ = server.AddHook(new(MyHook), &MyHookOptions{
		Server: server,
	})
	
	tcp := listeners.NewTCP("t1", *tcpAddr, nil)
	err := server.AddListener(tcp)
	if err != nil {
		log.Fatal(err)
	}

	go func() {
		err := server.Serve()
		if err != nil {
			log.Fatal(err)
		}
	}()

	<-done
	server.Log.Warn("caught signal, stopping...")
	_ = server.Close()
	server.Log.Info("main.go finished")

}

i mean use server.Publish("trigger",body,2) for publish
dont work for me

from server.

werbenhu avatar werbenhu commented on May 24, 2024

@mrhb6006 I have also tested the situation of subscribing and publishing to the QoS2 topic, and it works fine. You can modify my main.go to conduct the test.

from server.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.