Comments (21)
@mrhb6006 @thedevop @mochi-co According to MQTT spec 3.1.2.4 Clean Session , if the client does not expire, we should retain the client's subscriptions upon reconnection. However, I tested Mosquitto, and it does not support this feature. Therefore, I think making this an optional capability is a good idea.
from server.
@werbenhu , we current do retain client's subscription
Lines 495 to 501 in 5058333
We then clean up the old client:
Line 505 in 5058333
But will not unsubscribe topics if the client is taken over.
Lines 1265 to 1267 in 5058333
I have tested with Paho client, and it does retain the subscriptions. However, the messages are only stored if QOS > 0.
from server.
i use this pkg at production, plz help
from server.
@mrhb6006 I'm too busy at the moment. I will take a look at this issue as soon as I have some free time.
from server.
@mrhb6006 What is the version of the library in your app?
from server.
@mrhb6006 , in order for the message to be saved, the following conditions need to be met:
- Client session is not expired (met based on above)
- Client subscribed to the topic with QOS > 0
- Message published with QOS > 0
Can you check what QOS was used for subscription and publishing?
from server.
@thedevop
all conditions are met
when publish with real client it is ok
but when i use inline client it is not
from server.
@mrhb6006 What is the version of the library in your app?
last version
from server.
some
tnx
I'm waiting for you
from server.
Can you elaborate your steps as an inline client would not experience disconnect.
from server.
@mrhb6006 Take a look at #354 to see if it resolves your issue.
from server.
@thedevop The testing environment I'm in right now is indeed, as you said, without any issues. I will conduct further tests when I return to the previous environment in a few days.
from server.
@thedevop you are right. I tested it today, and I couldn't reproduce the issue. It might have been my mistake earlier.
from server.
@mrhb6006 , can you re-test if a client retains subscription when it disconnect/reconnect?
Can you also re-test the saved messages, make sure the following conditions are met:
- Client session is not expired (met based on above)
- Client subscribed to the topic with QOS > 0
- Message published with QOS > 0
from server.
@mrhb6006 , can you re-test if a client retains subscription when it disconnect/reconnect?
Can you also re-test the saved messages, make sure the following conditions are met:
- Client session is not expired (met based on above)
- Client subscribed to the topic with QOS > 0
- Message published with QOS > 0
yes all conditions are met.
i use inline client for publish and message not save at session
from server.
@thedevop you are right. I tested it today, and I couldn't reproduce the issue. It might have been my mistake earlier.
test with inline client publish ?
from server.
@mrhb6006 I will test this scenario later.
from server.
@mrhb6006 I tested it, and it's OK. Client A subscribes to the 'mochi' topic with QoS 1, then disconnects. Client B connects, publishes a message to the 'trigger' topic, triggering an inline publish to send a message. When Client A reconnects, it can receive the content sent by Client B. You can refer to my 'main.go' below for details. The client I used is mqttx.
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileContributor: mochi-co
package main
import (
"bytes"
"flag"
"log"
"os"
"os/signal"
"syscall"
mqtt "github.com/mochi-mqtt/server/v2"
"github.com/mochi-mqtt/server/v2/listeners"
"github.com/mochi-mqtt/server/v2/packets"
)
// Options contains configuration settings for the hook.
type MyHookOptions struct {
Server *mqtt.Server
}
// AllowHook is an authentication hook which allows connection access
// for all users and read and write access to all topics.
type MyHook struct {
mqtt.HookBase
config *MyHookOptions
}
func (h *MyHook) Init(config any) error {
h.Log.Info("initialised")
if _, ok := config.(*MyHookOptions); !ok && config != nil {
return mqtt.ErrInvalidConfigType
}
h.config = config.(*MyHookOptions)
if h.config.Server == nil {
return mqtt.ErrInvalidConfigType
}
return nil
}
// ID returns the ID of the hook.
func (h *MyHook) ID() string {
return "allow-all-auth"
}
// Provides indicates which hook methods this hook provides.
func (h *MyHook) Provides(b byte) bool {
return bytes.Contains([]byte{
mqtt.OnConnectAuthenticate,
mqtt.OnACLCheck,
mqtt.OnPublish,
}, []byte{b})
}
// OnConnectAuthenticate returns true/allowed for all requests.
func (h *MyHook) OnConnectAuthenticate(cl *mqtt.Client, pk packets.Packet) bool {
return true
}
// OnACLCheck returns true/allowed for all checks.
func (h *MyHook) OnACLCheck(cl *mqtt.Client, topic string, write bool) bool {
return true
}
func (h *MyHook) OnPublish(cl *mqtt.Client, pk packets.Packet) (packets.Packet, error) {
h.Log.Info("received from client", "client", cl.ID, "payload", string(pk.Payload))
pkx := pk
if string(pk.TopicName) == "trigger" {
h.config.Server.Publish("mochi", pk.Payload, false, 1);
}
return pkx, nil
}
func main() {
tcpAddr := flag.String("tcp", ":1883", "network address for TCP listener")
flag.Parse()
sigs := make(chan os.Signal, 1)
done := make(chan bool, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigs
done <- true
}()
server := mqtt.New(&mqtt.Options{
InlineClient: true,
})
_ = server.AddHook(new(MyHook), &MyHookOptions{
Server: server,
})
tcp := listeners.NewTCP("t1", *tcpAddr, nil)
err := server.AddListener(tcp)
if err != nil {
log.Fatal(err)
}
go func() {
err := server.Serve()
if err != nil {
log.Fatal(err)
}
}()
<-done
server.Log.Warn("caught signal, stopping...")
_ = server.Close()
server.Log.Info("main.go finished")
}
from server.
@mrhb6006 I tested it, and it's OK. Client A subscribes to the 'mochi' topic with QoS 1, then disconnects. Client B connects, publishes a message to the 'trigger' topic, triggering an inline publish to send a message. When Client A reconnects, it can receive the content sent by Client B. You can refer to my 'main.go' below for details. The client I used is mqttx.
// SPDX-License-Identifier: MIT // SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co // SPDX-FileContributor: mochi-co package main import ( "bytes" "flag" "log" "os" "os/signal" "syscall" mqtt "github.com/mochi-mqtt/server/v2" "github.com/mochi-mqtt/server/v2/listeners" "github.com/mochi-mqtt/server/v2/packets" ) // Options contains configuration settings for the hook. type MyHookOptions struct { Server *mqtt.Server } // AllowHook is an authentication hook which allows connection access // for all users and read and write access to all topics. type MyHook struct { mqtt.HookBase config *MyHookOptions } func (h *MyHook) Init(config any) error { h.Log.Info("initialised") if _, ok := config.(*MyHookOptions); !ok && config != nil { return mqtt.ErrInvalidConfigType } h.config = config.(*MyHookOptions) if h.config.Server == nil { return mqtt.ErrInvalidConfigType } return nil } // ID returns the ID of the hook. func (h *MyHook) ID() string { return "allow-all-auth" } // Provides indicates which hook methods this hook provides. func (h *MyHook) Provides(b byte) bool { return bytes.Contains([]byte{ mqtt.OnConnectAuthenticate, mqtt.OnACLCheck, mqtt.OnPublish, }, []byte{b}) } // OnConnectAuthenticate returns true/allowed for all requests. func (h *MyHook) OnConnectAuthenticate(cl *mqtt.Client, pk packets.Packet) bool { return true } // OnACLCheck returns true/allowed for all checks. func (h *MyHook) OnACLCheck(cl *mqtt.Client, topic string, write bool) bool { return true } func (h *MyHook) OnPublish(cl *mqtt.Client, pk packets.Packet) (packets.Packet, error) { h.Log.Info("received from client", "client", cl.ID, "payload", string(pk.Payload)) pkx := pk if string(pk.TopicName) == "trigger" { h.config.Server.Publish("mochi", pk.Payload, false, 1); } return pkx, nil } func main() { tcpAddr := flag.String("tcp", ":1883", "network address for TCP listener") flag.Parse() sigs := make(chan os.Signal, 1) done := make(chan bool, 1) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) go func() { <-sigs done <- true }() server := mqtt.New(&mqtt.Options{ InlineClient: true, }) _ = server.AddHook(new(MyHook), &MyHookOptions{ Server: server, }) tcp := listeners.NewTCP("t1", *tcpAddr, nil) err := server.AddListener(tcp) if err != nil { log.Fatal(err) } go func() { err := server.Serve() if err != nil { log.Fatal(err) } }() <-done server.Log.Warn("caught signal, stopping...") _ = server.Close() server.Log.Info("main.go finished") }
i mean use server.Publish("trigger",body,2) for publish
dont work for me
from server.
@mrhb6006 I have also tested the situation of subscribing and publishing to the QoS2 topic, and it works fine. You can modify my main.go to conduct the test.
from server.
Related Issues (20)
- how to override payload when do message inceptor?
- Got nil exception when inceptor publish HOT 4
- Can not get onPublish called after set Inline Client HOT 10
- invalid client status which loaded from storage. HOT 6
- consume very slow when Inflights to many HOT 4
- Packet encoding optimization HOT 1
- Data race in buffer with v2.4.3 HOT 4
- Client unsubscribe topic have problem HOT 1
- Version is "2.4.1" while the package version is v2.4.3 HOT 2
- OnACLCheck BUG HOT 11
- How to enable storage simplify HOT 16
- How to disable log HOT 1
- How to allow specific username to read/write on a specific topic, and denied enything else? HOT 1
- After enabled badger, the vlog file up to 700M one day and 4GB one week HOT 9
- Race condition when running the redis example HOT 4
- 遍历Clients时如何判断当前Client是否为Disconected状态 HOT 3
- 作者您好,请帮忙关注一下这个问题 HOT 3
- Hi, what is the simplest way to make messages can be restored when server cut off? HOT 5
- [badgerdb] vlog growing unbounded - consider adding GC and exposing options HOT 6
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from server.