chrispulman / mqttnet.rx Goto Github PK
View Code? Open in Web Editor NEWA Reactive MQTTnet Client and other Industrial protocols to use with the client. ManagedClient, TwinCat, ABPlc, Modbus, SerialPort, S7Plc
License: MIT License
A Reactive MQTTnet Client and other Industrial protocols to use with the client. ManagedClient, TwinCat, ABPlc, Modbus, SerialPort, S7Plc
License: MIT License
Hi, Thanks for creating a great libraries.
I have question regarding MQTTnet.Rx.Client package to share same client connection for all message subscription and publishing.
I see multiple client connections connected to the server. How can be use to share same client instance?
internal class Program {
static async Task Main(string[] args) {
var serverPort = 2883;
var mqttServerOptions = new MqttServerOptionsBuilder()
.WithDefaultEndpointPort(serverPort)
.WithDefaultEndpoint().Build();
var mqttFactory = new MqttFactory();
var mqttServer = mqttFactory.CreateMqttServer(mqttServerOptions);
mqttServer.ClientConnectedAsync += (args) => {
Console.WriteLine($"SERVER: ClientConnectedAsync => clientId:{args.ClientId}");
return Task.CompletedTask;
};
mqttServer.ClientDisconnectedAsync += (args) => {
Console.WriteLine($"SERVER: ClientDisconnectedAsync => clientId:{args.ClientId}");
return Task.CompletedTask;
};
Console.WriteLine("Starting MQTT server...");
await mqttServer.StartAsync();
Console.WriteLine("MQTT server is started.");
Console.WriteLine("------------------------------------\n");
var obsClient = Create.ManagedMqttClient()
.WithManagedClientOptions(options => {
options.WithClientOptions(c => {
c.WithTcpServer("localhost", serverPort);
//c.WithClientId("Client01");
});
options.WithAutoReconnectDelay(TimeSpan.FromSeconds(2));
});
obsClient.Subscribe(i => {
i.Connected().Subscribe((args) => {
Console.WriteLine($"{DateTime.Now.Dump()}\t CLIENT: Connected with server.");
});
i.Disconnected().Subscribe((args) => {
Console.WriteLine($"{DateTime.Now.Dump()}\t CLIENT: Disconnected with server.");
});
});
var s1 = obsClient.SubscribeToTopic("FromMilliseconds")
.Subscribe(r => {
Console.WriteLine($"\tCLIENT S1: {r.ReasonCode} [{r.ApplicationMessage.Topic}] value : {r.ApplicationMessage.ConvertPayloadToString()}");
});
var s2 = obsClient.SubscribeToTopic("FromMilliseconds")
.Subscribe(r => {
Console.WriteLine($"\tCLIENT S2: {r.ReasonCode} [{r.ApplicationMessage.Topic}] value : {r.ApplicationMessage.ConvertPayloadToString()}");
});
Subject<(string topic, string payload)> _message = new();
Observable.Interval(TimeSpan.FromMilliseconds(1000)).Subscribe(i => _message.OnNext(("FromMilliseconds", "{" + $"payload: {i}" + "}")));
obsClient.PublishMessage(_message).Subscribe(r => {
//Console.WriteLine($"Publish result [{r.Exception == null}], Id:{r.ApplicationMessage.Id}, Topic:{r.ApplicationMessage.ApplicationMessage.Topic}");
});
await Task.Delay(3000);
s2.Dispose();
Console.WriteLine("Dispose S2 ---------------");
Console.Read();
}
}
Starting MQTT server...
MQTT server is started.
------------------------------------
SERVER: ClientConnectedAsync => clientId:96e2cb5dce544559b823f12b17285864
SERVER: ClientConnectedAsync => clientId:936ca9900f3c4f599d4e7a63b32890ab
SERVER: ClientConnectedAsync => clientId:c3a72d8bfba64a5cbb523bb34fcc3094
SERVER: ClientConnectedAsync => clientId:77782d2f2f124550bee68adff8d80940
03:29:20.4536 CLIENT: Connected with server.
CLIENT S2: Success [FromMilliseconds] value : {payload: 0}
CLIENT S1: Success [FromMilliseconds] value : {payload: 0}
CLIENT S1: Success [FromMilliseconds] value : {payload: 1}
CLIENT S2: Success [FromMilliseconds] value : {payload: 1}
CLIENT S1: Success [FromMilliseconds] value : {payload: 2}
CLIENT S2: Success [FromMilliseconds] value : {payload: 2}
Dispose S2 ---------------
SERVER: ClientDisconnectedAsync => clientId:c3a72d8bfba64a5cbb523bb34fcc3094
CLIENT S1: Success [FromMilliseconds] value : {payload: 3}
CLIENT S1: Success [FromMilliseconds] value : {payload: 4}
CLIENT S1: Success [FromMilliseconds] value : {payload: 5}
If I try to use constant clientId in MqttClientOptionsBuilder c.WithClientId("Client01");
Starting MQTT server...
MQTT server is started.
------------------------------------
SERVER: ClientDisconnectedAsync => clientId:Client01
SERVER: ClientDisconnectedAsync => clientId:Client01
SERVER: ClientDisconnectedAsync => clientId:Client01
SERVER: ClientConnectedAsync => clientId:Client01
SERVER: ClientConnectedAsync => clientId:Client01
SERVER: ClientConnectedAsync => clientId:Client01
SERVER: ClientConnectedAsync => clientId:Client01
03:30:48.6046 CLIENT: Connected with server.
03:30:48.6194 CLIENT: Disconnected with server.
SERVER: ClientDisconnectedAsync => clientId:Client01
SERVER: ClientDisconnectedAsync => clientId:Client01
SERVER: ClientConnectedAsync => clientId:Client01
SERVER: ClientConnectedAsync => clientId:Client01
SERVER: ClientDisconnectedAsync => clientId:Client01
SERVER: ClientConnectedAsync => clientId:Client01
03:30:50.6232 CLIENT: Connected with server.
SERVER: ClientDisconnectedAsync => clientId:Client01
SERVER: ClientConnectedAsync => clientId:Client01
SERVER: ClientDisconnectedAsync => clientId:Client01
SERVER: ClientConnectedAsync => clientId:Client01
03:30:50.6594 CLIENT: Disconnected with server.
SERVER: ClientDisconnectedAsync => clientId:Client01
SERVER: ClientConnectedAsync => clientId:Client01
Dispose S2 ---------------
SERVER: ClientDisconnectedAsync => clientId:Client01
SERVER: ClientConnectedAsync => clientId:Client01
03:30:51.6368 CLIENT: Connected with server.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.