moquette-io / moquette Goto Github PK
View Code? Open in Web Editor NEWJava MQTT lightweight broker
Home Page: http://moquette-io.github.io/moquette/
License: Apache License 2.0
Java MQTT lightweight broker
Home Page: http://moquette-io.github.io/moquette/
License: Apache License 2.0
The Subscription tree using following code to check whether client are subscribe to same topic with different QOS:
// org.eclipse.moquette.spi.impl.subscriptions.TreeNode.java
void addSubscription(Subscription s) {
//avoid double registering for same clientID, topic and QoS
if (m_subscriptions.contains(s)) {
return;
}
//remove existing subscription for same client and topic but different QoS
int existingSubIdx = Collections.binarySearch(m_subscriptions, s, new ClientIDComparator());
if (existingSubIdx >= 0) {
m_subscriptions.remove(existingSubIdx);
}
m_subscriptions.add(s);
}
The code try to locate duplicated element in m_subscriptions
by using Binary-search. However, i don't understand how this can works without sort the array first.
With this simple test code, the m_subscriptions
will contains duplicated subscription with different QOS:
SubscriptionsStore subscriptionsStore = // init;
Subscription s1 = new Subscription("client1","client/test/a", AbstractMessage.QOSType.MOST_ONCE,true);
Subscription s2 = new Subscription("client8","client/test/b", AbstractMessage.QOSType.MOST_ONCE,true);
Subscription s3 = new Subscription("client3","client/test/c", AbstractMessage.QOSType.MOST_ONCE,true);
subscriptionsStore.add(s1);
subscriptionsStore.add(s3);
subscriptionsStore.add(s2);
subscriptionsStore.add(s2);
Subscription s4 = new Subscription("client2","client/test/b", AbstractMessage.QOSType.LEAST_ONCE,true);
subscriptionsStore.add(s4);
Subscription s5 = new Subscription("client5","client/test/b", AbstractMessage.QOSType.LEAST_ONCE,true);
subscriptionsStore.add(s5);
Subscription s6 = new Subscription("client6","client/test/b", AbstractMessage.QOSType.MOST_ONCE,true);
subscriptionsStore.add(s6);
Subscription s7 = new Subscription("client7","client/test/b", AbstractMessage.QOSType.MOST_ONCE,true);
subscriptionsStore.add(s7);
Subscription s8 = new Subscription("client1","client/test/b", AbstractMessage.QOSType.MOST_ONCE,true);
subscriptionsStore.add(s8);
Subscription s9 = new Subscription("client1","client/test/b", AbstractMessage.QOSType.EXACTLY_ONCE,true);
subscriptionsStore.add(s9);
System.out.println(subscriptionsStore.dumpTree());
// contains two subscription of client1 with different QOS: MOST_ONCE and EXACTLY_ONCE
unsubscribe without clean m_sessionsStore, but when subscribe a topic, there is a "m_sessionsStore.addNewSubscription(newSubscription, clientID);" run in the subscribeSingleTopic function.
@MQTTMessage(message = UnsubscribeMessage.class)
void processUnsubscribe(ServerChannel session, UnsubscribeMessage msg) {
List<String> topics = msg.topicFilters();
int messageID = msg.getMessageID();
String clientID = (String) session.getAttribute(NettyChannel.ATTR_KEY_CLIENTID);
LOG.debug("UNSUBSCRIBE subscription on topics {} for clientID <{}>", topics, clientID);
for (String topic : topics) {
subscriptions.removeSubscription(topic, clientID);
}
//ack the client
UnsubAckMessage ackMessage = new UnsubAckMessage();
ackMessage.setMessageID(messageID);
LOG.info("replying with UnsubAck to MSG ID {}", messageID);
session.write(ackMessage);
}
private void subscribeSingleTopic(Subscription newSubscription, final String topic) {
LOG.info("<{}> subscribed to topic <{}> with QoS {}",
newSubscription.getClientId(), topic,
AbstractMessage.QOSType.formatQoS(newSubscription.getRequestedQos()));
String clientID = newSubscription.getClientId();
m_sessionsStore.addNewSubscription(newSubscription, clientID);
subscriptions.add(newSubscription);
//scans retained messages to be published to the new subscription
Collection<IMessagesStore.StoredMessage> messages = m_messagesStore.searchMatching(new IMatchingCondition() {
public boolean match(String key) {
return SubscriptionsStore.matchTopics(key, topic);
}
});
for (IMessagesStore.StoredMessage storedMsg : messages) {
//fire the as retained the message
LOG.debug("send publish message for topic {}", topic);
//forwardPublishQoS0(newSubscription.getClientId(), storedMsg.getTopic(), storedMsg.getQos(), storedMsg.getPayload(), true);
Integer packetID = storedMsg.getQos() == QOSType.MOST_ONE ? null :
m_messagesStore.nextPacketID(newSubscription.getClientId());
sendPublish(newSubscription.getClientId(), storedMsg.getTopic(), storedMsg.getQos(), storedMsg.getPayload(), true, packetID);
}
}
Steps that reproduce the problem:
MOQUETTE.LOG
Maybe I didn't understand the logic behind the code but I think that when broker is started the old subscriptions need to be deleted from the db.
I'm using Moquette v0.7 on Windows 8.1 (my dev environment)
0 [main] DEBUG SubscriptionsStore - init invoked
6 [main] DEBUG MapDBPersistentStore - retrieveAllSubscriptions returning subs []
7 [main] DEBUG SubscriptionsStore - Reloading all stored subscriptions...subscription tree before
7 [main] DEBUG SubscriptionsStore - Finished loading. Subscription tree after
8 [main] INFO FileAuthenticator - Loading password file: C:\bin\eclipse43.workspaces\spazio1\moquette-mqtt-broker\src\config\password_file.conf
8 [main] DEBUG ProtocolProcessor - subscription tree on init
531 [main] INFO NettyAcceptor - Server binded
535 [main] INFO NettyAcceptor - Server binded
536 [main] INFO NettyAcceptor - SSL is disabled
10187 [nioEventLoopGroup-3-1] INFO NettyMQTTHandler - Received a message of type CONNECT
10187 [nioEventLoopGroup-3-1] DEBUG SimpleMessaging - disruptorPublish publishing event ProtocolEvent wrapping CONNECT
10188 [pool-1-thread-1] INFO SimpleMessaging - onEvent processing messaging event from input ringbuffer ProtocolEvent wrapping CONNECT
10188 [pool-1-thread-1] DEBUG ProtocolProcessor - processConnect for client mosqsub/24676-DVL
10189 [pool-1-thread-1] DEBUG ProtocolProcessor - Connect with keepAlive 60 s
10191 [pool-1-thread-1] DEBUG SubscriptionsStore - Activating subscriptions for clientID <mosqsub/24676-DVL>
10283 [pool-1-thread-1] INFO ProtocolProcessor - cleaning old saved subscriptions for client <mosqsub/24676-DVL>
10317 [pool-1-thread-1] DEBUG ProtocolProcessor - processConnect sent OK ConnAck
10319 [pool-1-thread-1] WARN ProtocolProcessor - Connected client ID <mosqsub/24676-DVL> with clean session true
10320 [pool-1-thread-1] INFO ProtocolProcessor - Create persistent session for clientID mosqsub/24676-DVL
10321 [pool-1-thread-1] DEBUG MapDBPersistentStore - addNewSubscription invoked with subscription [filter:, cliID: mosqsub/24676-DVL, qos: MOST_ONE, active: true] for client mosqsub/24676-DVL
10321 [pool-1-thread-1] DEBUG MapDBPersistentStore - clientID mosqsub/24676-DVL is a newcome, creating it's subscriptions set
10322 [pool-1-thread-1] DEBUG MapDBPersistentStore - updating clientID mosqsub/24676-DVL subscriptions set with new subscription
10331 [nioEventLoopGroup-3-1] INFO NettyMQTTHandler - Received a message of type SUBSCRIBE
10331 [nioEventLoopGroup-3-1] DEBUG SimpleMessaging - disruptorPublish publishing event ProtocolEvent wrapping SUBSCRIBE
10344 [pool-1-thread-1] DEBUG MapDBPersistentStore - clientID mosqsub/24676-DVL subscriptions set now is [[filter:, cliID: mosqsub/24676-DVL, qos: MOST_ONE, active: true]]
10384 [pool-1-thread-1] INFO SimpleMessaging - onEvent processing messaging event from input ringbuffer ProtocolEvent wrapping SUBSCRIBE
10384 [pool-1-thread-1] DEBUG ProtocolProcessor - processSubscribe invoked from client mosqsub/24676-DVL with msgID 1
10385 [pool-1-thread-1] INFO ProtocolProcessor - <mosqsub/24676-DVL> subscribed to topic 78:25:44:7E:E9:A0/D with QoS 0 - MOST_ONE
10385 [pool-1-thread-1] DEBUG MapDBPersistentStore - addNewSubscription invoked with subscription [filter:78:25:44:7E:E9:A0/D, cliID: mosqsub/24676-DVL, qos: MOST_ONE, active: true] for client mosqsub/24676-DVL
10385 [pool-1-thread-1] DEBUG MapDBPersistentStore - updating clientID mosqsub/24676-DVL subscriptions set with new subscription
10386 [pool-1-thread-1] DEBUG MapDBPersistentStore - clientID mosqsub/24676-DVL subscriptions set now is [[filter:78:25:44:7E:E9:A0/D, cliID: mosqsub/24676-DVL, qos: MOST_ONE, active: true], [filter:, cliID: mosqsub/24676-DVL, qos: MOST_ONE, active: true]]
10422 [pool-1-thread-1] DEBUG MapDBPersistentStore - searchMatching scanning all retained messages, presents are 0
10422 [pool-1-thread-1] DEBUG ProtocolProcessor - replying with SubAck to MSG ID 1
0 [main] DEBUG SubscriptionsStore - init invoked
13223 [main] DEBUG MapDBPersistentStore - retrieveAllSubscriptions returning subs [[filter:78:25:44:7E:E9:A0/D, cliID: mosqsub/24676-DVL, qos: MOST_ONE, active: true], [filter:, cliID: mosqsub/24676-DVL, qos: MOST_ONE, active: true]]
15744 [main] DEBUG SubscriptionsStore - Reloading all stored subscriptions...subscription tree before
33059 [main] DEBUG SubscriptionsStore - Re-subscribing mosqsub/24676-DVL to topic 78:25:44:7E:E9:A0/D
58676 [main] DEBUG SubscriptionsStore - Re-subscribing mosqsub/24676-DVL to topic
66856 [main] DEBUG SubscriptionsStore - Finished loading. Subscription tree after
78:25:44:7E:E9:A0
D[filter:78:25:44:7E:E9:A0/D, cliID: mosqsub/24676-DVL, qos: MOST_ONE, active: true]
[filter:, cliID: mosqsub/24676-DVL, qos: MOST_ONE, active: true]
87979 [main] INFO FileAuthenticator - Loading password file: C:\bin\eclipse43.workspaces\spazio1\moquette-mqtt-broker\src\config\password_file.conf
87980 [main] DEBUG ProtocolProcessor - subscription tree on init
78:25:44:7E:E9:A0
D[filter:78:25:44:7E:E9:A0/D, cliID: mosqsub/24676-DVL, qos: MOST_ONE, active: true]
[filter:, cliID: mosqsub/24676-DVL, qos: MOST_ONE, active: true]
88685 [main] INFO NettyAcceptor - Server binded
88687 [main] INFO NettyAcceptor - Server binded
88689 [main] INFO NettyAcceptor - SSL is disabled
95949 [nioEventLoopGroup-3-1] INFO NettyMQTTHandler - Received a message of type CONNECT
95950 [nioEventLoopGroup-3-1] DEBUG SimpleMessaging - disruptorPublish publishing event ProtocolEvent wrapping CONNECT
95951 [pool-1-thread-1] INFO SimpleMessaging - onEvent processing messaging event from input ringbuffer ProtocolEvent wrapping CONNECT
95951 [pool-1-thread-1] DEBUG ProtocolProcessor - processConnect for client mosqpub/27720-DVL
95952 [pool-1-thread-1] DEBUG ProtocolProcessor - Connect with keepAlive 60 s
95955 [pool-1-thread-1] DEBUG SubscriptionsStore - Activating subscriptions for clientID <mosqpub/27720-DVL>
125014 [pool-1-thread-1] INFO ProtocolProcessor - cleaning old saved subscriptions for client <mosqpub/27720-DVL>
125072 [pool-1-thread-1] DEBUG ProtocolProcessor - processConnect sent OK ConnAck
125098 [pool-1-thread-1] WARN ProtocolProcessor - Connected client ID <mosqpub/27720-DVL> with clean session true
125098 [pool-1-thread-1] INFO ProtocolProcessor - Create persistent session for clientID mosqpub/27720-DVL
125098 [pool-1-thread-1] DEBUG MapDBPersistentStore - addNewSubscription invoked with subscription [filter:, cliID: mosqpub/27720-DVL, qos: MOST_ONE, active: true] for client mosqpub/27720-DVL
125098 [pool-1-thread-1] DEBUG MapDBPersistentStore - clientID mosqpub/27720-DVL is a newcome, creating it's subscriptions set
125099 [pool-1-thread-1] DEBUG MapDBPersistentStore - updating clientID mosqpub/27720-DVL subscriptions set with new subscription
125100 [nioEventLoopGroup-3-1] INFO PublishDecoder - decode invoked with buffer UnpooledUnsafeDirectByteBuf(ridx: 1, widx: 833, cap: 1024)
125100 [nioEventLoopGroup-3-1] INFO NettyMQTTHandler - Received a message of type PUBLISH
125100 [nioEventLoopGroup-3-1] DEBUG SimpleMessaging - disruptorPublish publishing event ProtocolEvent wrapping PUBLISH
125100 [nioEventLoopGroup-3-1] INFO NettyMQTTHandler - Received a message of type DISCONNECT
125100 [nioEventLoopGroup-3-1] DEBUG SimpleMessaging - disruptorPublish publishing event ProtocolEvent wrapping DISCONNECT
125103 [nioEventLoopGroup-3-1] DEBUG SimpleMessaging - disruptorPublish publishing event org.dna.mqtt.moquette.messaging.spi.impl.events.LostConnectionEvent@ff6c19
125115 [pool-1-thread-1] DEBUG MapDBPersistentStore - clientID mosqpub/27720-DVL subscriptions set now is [[filter:, cliID: mosqpub/27720-DVL, qos: MOST_ONE, active: true]]
125153 [pool-1-thread-1] INFO SimpleMessaging - onEvent processing messaging event from input ringbuffer ProtocolEvent wrapping PUBLISH
125153 [pool-1-thread-1] TRACE ProtocolProcessor - PUB --PUBLISH--> SRV processPublish invoked with org.dna.mqtt.moquette.proto.messages.PublishMessage@15dcd1d
125153 [pool-1-thread-1] INFO ProtocolProcessor - Publish recieved from clientID <mosqpub/27720-DVL> on topic 78:25:44:7E:E9:A0/D with QoS MOST_ONE
125153 [pool-1-thread-1] DEBUG ProtocolProcessor - publish2Subscribers republishing to existing subscribers that matches the topic 78:25:44:7E:E9:A0/D
125155 [pool-1-thread-1] DEBUG ProtocolProcessor - content <{XYZ}>
125155 [pool-1-thread-1] DEBUG ProtocolProcessor - subscription tree
78:25:44:7E:E9:A0
D[filter:78:25:44:7E:E9:A0/D, cliID: mosqsub/24676-DVL, qos: MOST_ONE, active: true]
[filter:, cliID: mosqsub/24676-DVL, qos: MOST_ONE, active: true]
133913 [pool-1-thread-1] DEBUG ProtocolProcessor - Broker republishing to client <mosqsub/24676-DVL> topic 78:25:44:7E:E9:A0/D qos <MOST_ONE>, active true
145081 [pool-1-thread-1] DEBUG ProtocolProcessor - sendPublish invoked clientId <mosqsub/24676-DVL> on topic 78:25:44:7E:E9:A0/D QoS MOST_ONE retained false messageID 1
145081 [pool-1-thread-1] INFO ProtocolProcessor - send publish message to <mosqsub/24676-DVL> on topic 78:25:44:7E:E9:A0/D
145081 [pool-1-thread-1] DEBUG ProtocolProcessor - content <{XYZ}>
145082 [pool-1-thread-1] DEBUG ProtocolProcessor - clientIDs are {mosqpub/27720-DVL=ConnectionDescriptor{m_clientID=mosqpub/27720-DVL, m_cleanSession=true}}
Server started, version 0.7-SNAPSHOT
125098 [pool-1-thread-1] WARN ProtocolProcessor - Connected client ID <mosqpub/27720-DVL> with clean session true
dic 02, 2014 8:17:45 PM com.lmax.disruptor.FatalExceptionHandler handleEventException
SEVERE: Exception processing: 1 org.dna.mqtt.moquette.messaging.spi.impl.ValueEvent@c625e1
java.lang.RuntimeException: java.lang.reflect.InvocationTargetException
at org.dna.mqtt.moquette.messaging.spi.impl.AnnotationSupport.dispatch(AnnotationSupport.java:66)
at org.dna.mqtt.moquette.messaging.spi.impl.SimpleMessaging.onEvent(SimpleMessaging.java:156)
at org.dna.mqtt.moquette.messaging.spi.impl.SimpleMessaging.onEvent(SimpleMessaging.java:1)
at com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:113)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at org.dna.mqtt.moquette.messaging.spi.impl.AnnotationSupport.dispatch(AnnotationSupport.java:64)
... 6 more
Caused by: java.lang.RuntimeException: Can't find a ConnectionDescriptor for client <mosqsub/24676-DVL> in cache <{mosqpub/27720-DVL=ConnectionDescriptor{m_clientID=mosqpub/27720-DVL, m_cleanSession=true}}>
at org.dna.mqtt.moquette.messaging.spi.impl.ProtocolProcessor.sendPublish(ProtocolProcessor.java:410)
at org.dna.mqtt.moquette.messaging.spi.impl.ProtocolProcessor.sendPublish(ProtocolProcessor.java:386)
at org.dna.mqtt.moquette.messaging.spi.impl.ProtocolProcessor.publish2Subscribers(ProtocolProcessor.java:359)
at org.dna.mqtt.moquette.messaging.spi.impl.ProtocolProcessor.processPublish(ProtocolProcessor.java:306)
at org.dna.mqtt.moquette.messaging.spi.impl.ProtocolProcessor.processPublish(ProtocolProcessor.java:281)
... 11 more
Exception in thread "pool-1-thread-1" java.lang.RuntimeException: java.lang.RuntimeException: java.lang.reflect.InvocationTargetException
at com.lmax.disruptor.FatalExceptionHandler.handleEventException(FatalExceptionHandler.java:45)
at com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:128)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.RuntimeException: java.lang.reflect.InvocationTargetException
at org.dna.mqtt.moquette.messaging.spi.impl.AnnotationSupport.dispatch(AnnotationSupport.java:66)
at org.dna.mqtt.moquette.messaging.spi.impl.SimpleMessaging.onEvent(SimpleMessaging.java:156)
at org.dna.mqtt.moquette.messaging.spi.impl.SimpleMessaging.onEvent(SimpleMessaging.java:1)
at com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:113)
... 3 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at org.dna.mqtt.moquette.messaging.spi.impl.AnnotationSupport.dispatch(AnnotationSupport.java:64)
... 6 more
Caused by: java.lang.RuntimeException: Can't find a ConnectionDescriptor for client <mosqsub/24676-DVL> in cache <{mosqpub/27720-DVL=ConnectionDescriptor{m_clientID=mosqpub/27720-DVL, m_cleanSession=true}}>
at org.dna.mqtt.moquette.messaging.spi.impl.ProtocolProcessor.sendPublish(ProtocolProcessor.java:410)
at org.dna.mqtt.moquette.messaging.spi.impl.ProtocolProcessor.sendPublish(ProtocolProcessor.java:386)
at org.dna.mqtt.moquette.messaging.spi.impl.ProtocolProcessor.publish2Subscribers(ProtocolProcessor.java:359)
at org.dna.mqtt.moquette.messaging.spi.impl.ProtocolProcessor.processPublish(ProtocolProcessor.java:306)
at org.dna.mqtt.moquette.messaging.spi.impl.ProtocolProcessor.processPublish(ProtocolProcessor.java:281)
... 11 more
Reporter: diegovis
Original: https://code.google.com/p/moquette-mqtt/issues/detail?id=54
error option
A connection server (clear session false)-> subscription /sys/abc -> distconnection
B connection server (clear session false )-> publish message (topic /sys/abc qos =2)
A connection server (clear session false) ...server error
error code
13223 [pool-1-thread-1] INFO ProtocolProcessor - Create persistent session for clientID
13232 [pool-1-thread-1] INFO ProtocolProcessor - Connected client ID with clean session false
13232 [pool-1-thread-1] INFO ProtocolProcessor - No stored messages for client
34268 [pool-1-thread-1] INFO ProtocolProcessor - Create persistent session for clientID
34274 [pool-1-thread-1] INFO ProtocolProcessor - Connected client ID with clean session false
34274 [pool-1-thread-1] INFO ProtocolProcessor - No stored messages for client
44705 [pool-1-thread-1] INFO ProtocolProcessor - subscribed to topic </sys/abc> with QoS 0 - MOST_ONE
47234 [pool-1-thread-1] INFO ProtocolProcessor - DISCONNECT client with clean session false
60281 [pool-1-thread-1] INFO ProtocolProcessor - PUBLISH from clientID on topic </sys/abc> with QoS EXACTLY_ONCE
64265 [pool-1-thread-1] INFO ProtocolProcessor - Create persistent session for clientID
64269 [pool-1-thread-1] INFO ProtocolProcessor - Connected client ID with clean session false
64270 [pool-1-thread-1] INFO ProtocolProcessor - republishing stored messages to client
64270 [pool-1-thread-1] INFO ProtocolProcessor - send publish message to on topic </sys/abc>
64272 [pool-1-thread-1] ERROR SimpleMessaging - Serious error processing the message Connect [clientID: AAAAA, prot: MQTT, ver: 04, clean: false] for session [clientID: AAAAA]org.eclipse.moquette.server.netty.NettyChannel@78c36c6f
java.lang.RuntimeException: java.lang.reflect.InvocationTargetException
at org.eclipse.moquette.spi.impl.AnnotationSupport.dispatch(AnnotationSupport.java:66)
at org.eclipse.moquette.spi.impl.SimpleMessaging.onEvent(SimpleMessaging.java:167)
at org.eclipse.moquette.spi.impl.SimpleMessaging.onEvent(SimpleMessaging.java:57)
at com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:128)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.eclipse.moquette.spi.impl.AnnotationSupport.dispatch(AnnotationSupport.java:64)
... 6 more
Caused by: java.lang.RuntimeException: Internal bad error, trying to forwardPublish a QoS 0 message with PacketIdentifier: 0
at org.eclipse.moquette.spi.impl.ProtocolProcessor.sendPublish(ProtocolProcessor.java:416)
at org.eclipse.moquette.spi.impl.ProtocolProcessor.republishStoredInSession(ProtocolProcessor.java:263)
at org.eclipse.moquette.spi.impl.ProtocolProcessor.processConnect(ProtocolProcessor.java:240)
... 11 more
TestCase
Client A-----> Connection Server
Client A publish message [ retian:false)]
Qos:0 or 1 or 2 Will occur error
error :code
ERROR SimpleMessaging - Serious error processing the message org.eclipse.moquette.proto.messages.PublishMessage@18f93d85 for session [clientID: A]org.eclipse.moquette.server.netty.NettyChannel@1335f392
java.lang.RuntimeException: java.lang.reflect.InvocationTargetException
at org.eclipse.moquette.spi.impl.AnnotationSupport.dispatch(AnnotationSupport.java:66)
at org.eclipse.moquette.spi.impl.SimpleMessaging.onEvent(SimpleMessaging.java:166)
at org.eclipse.moquette.spi.impl.SimpleMessaging.onEvent(SimpleMessaging.java:56)
at com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:128)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.eclipse.moquette.spi.impl.AnnotationSupport.dispatch(AnnotationSupport.java:64)
... 6 more
Caused by: java.lang.RuntimeException: Can't find a ConnectionDescriptor for client in cache <{A=ConnectionDescriptor{m_clientID=A, m_cleanSession=true}}>
at org.eclipse.moquette.spi.impl.ProtocolProcessor.sendPublish(ProtocolProcessor.java:411)
at org.eclipse.moquette.spi.impl.ProtocolProcessor.forward2Subscribers(ProtocolProcessor.java:360)
at org.eclipse.moquette.spi.impl.ProtocolProcessor.processPublish(ProtocolProcessor.java:288)
at org.eclipse.moquette.spi.impl.ProtocolProcessor.processPublish(ProtocolProcessor.java:281)
... 11 more
The CONNECT message process code: ProtocolProcessor.processConnect
will first try to close presented connection if it has same clientID with current requester:
void processConnect(ServerChannel session, ConnectMessage msg) {
// it should check requester first before close old connection
if( m_clientIDs.contains(requester.clientID)){
// close old connection here
}
if (authenticator != null && !authenticator.checkValid(msg.isUserFlag() ? msg.getUsername() : null, msg.isPasswordFlag() ? msg.getPassword().toCharArray() : null))
{
// ...
}
}
it probably more reasonable to first check whether current requester is valid. because this can prevent evil client(without get authorized) to break exist connection.
Hi,
currently a client can connect without supplying any username and password, even if the configuration is set to use a password file (password_file parameter in moquette.conf). However, if the client specifies a username and password, they will be compared with the ones in password_file.
Mosquitto solves this with the allow_anonymous configuration parameter, and moquette could have a similar parameter.
Thank you
Error below -
Note: Also is there a way to provide the jars for .7 in mean time as we have been unable to build via maven
using Maven version 3.2.5
referencing - https://github.com/andsel/moquette
20:39:10,312 [pool-1-thread-1] ERROR SimpleMessaging onEvent 167 - Grave error
processing the message org.eclipse.moquette.proto.messages.PublishMessage@1bf0f5
b for session [clientID: Publisher]org.eclipse.moquette.server.netty.NettyChanne
l@1d44d0d
java.lang.RuntimeException: java.lang.reflect.InvocationTargetException
at org.eclipse.moquette.spi.impl.AnnotationSupport.dispatch(AnnotationSu
pport.java:66)
at org.eclipse.moquette.spi.impl.SimpleMessaging.onEvent(SimpleMessaging
.java:161)
at org.eclipse.moquette.spi.impl.SimpleMessaging.onEvent(SimpleMessaging
.java:51)
at com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:1
28)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.
java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor
.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.
java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcces
sorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.eclipse.moquette.spi.impl.AnnotationSupport.dispatch(AnnotationSu
pport.java:64)
... 6 more
Caused by: java.lang.RuntimeException: Can't find a ConnectionDescriptor for cli
ent in cache <{Publisher=ConnectionDescriptor{m_clientID=Publisher,
m_cleanSession=true}, Subscriber=ConnectionDescriptor{m_clientID=Subscriber, m_
cleanSession=false}}>
at org.eclipse.moquette.spi.impl.ProtocolProcessor.sendPublish(ProtocolP
rocessor.java:411)
at org.eclipse.moquette.spi.impl.ProtocolProcessor.forward2Subscribers(P
rotocolProcessor.java:360)
at org.eclipse.moquette.spi.impl.ProtocolProcessor.processPublish(Protoc
olProcessor.java:294)
at org.eclipse.moquette.spi.impl.ProtocolProcessor.processPublish(Protoc
olProcessor.java:281)
... 11 more
are you planning to support this kind of function?
Thanks
acl support like mosquitto
unresolved dependency
Could not find artifact org.mapdb:mapdb:jar:1.1.0-SNAPSHOT in Paho Releases (https://repo.eclipse.org/content/repositories/paho-releases/)
why not useing netty epoll model , Increase processing capacity
Hi,
I tried to setup MQTT Broker on an android devices. But I always get this error
java.net.BindException: bind failed: EADDRNOTAVAIL (Cannot assign requested address)
Should I change HOST "0.0.0.0" to a new local ip like "192.168.178.111"?
io.netty.handler.codec.CorruptedFrameException: Received a message with fixed header flags (a) != expected (2)
at org.dna.mqtt.moquette.parser.netty.DemuxDecoder.genericDecodeCommonHeader(DemuxDecoder.java:62)
at org.dna.mqtt.moquette.parser.netty.DemuxDecoder.decodeCommonHeader(DemuxDecoder.java:44)
at org.dna.mqtt.moquette.parser.netty.PubRelDecoder.decode(PubRelDecoder.java:36)
at org.dna.mqtt.moquette.parser.netty.MQTTDecoder.decode(MQTTDecoder.java:71)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:232)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:131)
at io.netty.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:332)
at io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:318)
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
at io.netty.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:332)
at io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:318)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:253)
at io.netty.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:332)
at io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:318)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:785)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:100)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:478)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:447)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:341)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:101)
at java.lang.Thread.run(Thread.java:744)
test case moquette 0.7 update for 2015-1-26
client eclipse.paht.javaclient version :1.0.1
aclient (clear session false)
bclient (clear session false)
a -> connection Server
b -> connection Server
b -> subscriptions "abc/topic" (qos =0)
b -> distconnection or lostconnection
a ->publish ("abc/topic","test1",qos=2,false);
...
a ->publish ("abc/topic","test10",qos=2,false);
b -> connection Server
received message test1
...
received message test10
b-> distconnection Server or loast connection
b->connnection Server
received message test1
...
received message test10
why repeat received message
moquette use ringbuffer to publish/batch event to ProtocolProcessor
, and the ProtocolProcessor
using another ringbuffer to dispatch output event to netty.
By default, the ringbuffer will pre-allocate an array of event object(ValueEvent
), and every event object will be referenced to this array until disruptor get closed.
The problems of here are moquette(SimpleMessaging.disruptorPublish
and ProtocolProcessor.disruptorPublish
) try to associated netty's channel(ServerChannel) with event object, which means, the ringbuffer always contain references of netty's channel until buffer overwrite.
Every established netty channel contains quite a lot data(such as pooled byte buffer, channel pipelines...). so if user's JVM memory low than netty's channel size * buffer size
, OOM Exception will be thrown.
it may be a good idea to clean event object(it contains a reference to netty's channel) after event object get processed, for example, in ProtocolProcessor
, we may use:
/**
* Called when a publisher has published an event to the {@link RingBuffer}
*
* @param event published to the {@link RingBuffer}
* @param sequence of the event being processed
* @param endOfBatch flag to indicate if this is the last event in a batch from the {@link RingBuffer}
* @throws Exception if the EventHandler would like the exception handled further up the chain.
*/
@Override
public void onEvent(ValueEvent event, long sequence, boolean endOfBatch) throws Exception {
//It's always of type OutputMessagingEvent
OutputMessagingEvent outEvent = (OutputMessagingEvent) event.getEvent();
try {
final ServerChannel serverChannel = outEvent.getChannel();
final AbstractMessage outputEvent = outEvent.getMessage();
if(LOG.isDebugEnabled()){
LOG.debug("Output event, sending {}", outputEvent);
}
serverChannel.write(outputEvent);
}
finally {
// free event resource
event.setEvent(null);
}
}
same code should apply to SimpleMessaging
as well.
Hi,
my goal is to be able to use moquette with my own IAuthenticator.
What I think is necessary to achieve this :
If you agree, I'd like to do that in my fork and then crate a PR.
Are you ok with that ?
Check the behaviour of Moquette against https://issues.oasis-open.org/browse/MQTT-58
Looks like its failing when building the broker- see below (note: its seems to be getting passed prior error with db files )
results :
ailed tests: testCleanSession_maintainClientSubscriptions_againstClientDestru
tion(org.eclipse.moquette.server.ServerIntegrationPahoTest)
testCleanSession_maintainClientSubscriptions_withServerRestart(org.eclipse.moq
ette.server.ServerIntegrationPahoTest)
checkReceivePublishedMessage_after_a_reconnect_with_notCleanSession(org.eclips
.moquette.server.ServerIntegrationPahoTest)
avoidMultipleNotificationsAfterMultipleReconnection_cleanSessionFalseQoS1(org.
clipse.moquette.server.ServerIntegrationPahoTest)
testCleanSession_maintainClientSubscriptions(org.eclipse.moquette.server.Serve
IntegrationPahoTest)
testCleanSession_correctlyClientSubscriptions(org.eclipse.moquette.server.Serv
rIntegrationPahoTest)
testSubscribe(org.eclipse.moquette.server.ServerIntegrationPahoTest)
testPublishReceiveWithQoS2(org.eclipse.moquette.server.ServerIntegrationPahoTe
t)
testPublishWithQoS1(org.eclipse.moquette.server.ServerIntegrationPahoTest)
testPublishWithQoS2(org.eclipse.moquette.server.ServerIntegrationPahoTest): ex
ected:<2> but was:<1>
testPublishWithQoS2(org.eclipse.moquette.server.ServerIntegrationPahoTest)
testPublishWithQoS1_notCleanSession(org.eclipse.moquette.server.ServerIntegrat
onPahoTest)
testRetain_maintainMessage_againstClientDestruction(org.eclipse.moquette.serve
.ServerIntegrationPahoTest)
testUnsubscribe_do_not_notify_anymore_same_session(org.eclipse.moquette.server
ServerIntegrationPahoTest)
testUnsubscribe_do_not_notify_anymore_new_session(org.eclipse.moquette.server.
erverIntegrationPahoTest)
checkSubscriberQoS1ReceiveQoS0publishes(org.eclipse.moquette.server.ServerInte
rationQoSValidationTest): expected:<[Hello world MQTT QoS0]> but was:<[Test my
ayload]>
checkSubscriberQoS0ReceiveQoS0publishes(org.eclipse.moquette.server.ServerInte
rationQoSValidationTest): expected:<[Hello world MQTT QoS0]> but was:<[Test my
ayload]>
checkSubscriberQoS0ReceiveQoS2publishes_downgrade(org.eclipse.moquette.server.
erverIntegrationQoSValidationTest): expected:<[Hello world MQTT QoS2]> but was:
[Test my payload]>
checkSubscriberQoS2ReceiveQoS0publishes(org.eclipse.moquette.server.ServerInte
rationQoSValidationTest): expected:<[Hello world MQTT QoS2]> but was:<[Test my
ayload]>
checkSubscriberQoS2ReceiveQoS2publishes(org.eclipse.moquette.server.ServerInte
rationQoSValidationTest): expected:<[Hello world MQTT QoS2]> but was:<[Test my
ayload]>
checkSupportSSL(org.eclipse.moquette.server.ServerIntegrationSSLTest)
checkRestartCleanSubscriptionTree(org.eclipse.moquette.server.ServerRestartInt
grationTest): expected:<[Hello world MQTT!!]> but was:<[Test my payload]>
checkDontPublishInactiveClientsAfterServerRestart(org.eclipse.moquette.server.
erverRestartIntegrationTest): expected:<[Hello world MQTT!!]> but was:<[Test my
payload]>
overridingSubscriptions(org.eclipse.moquette.spi.impl.MapDBPersistentStoreTest
: The DB storagefile C:\Users\Brian\moquette_store.mapdb already exists
overridingSubscriptions(org.eclipse.moquette.spi.impl.MapDBPersistentStoreTest
: Error deleting the moquette db file C:\Users\Brian\moquette_store.mapdb
ests run: 88, Failures: 25, Errors: 0, Skipped: 0
INFO] ------------------------------------------------------------------------
INFO] Reactor Summary:
INFO]
INFO] Moquette MQTT parent ............................... SUCCESS [ 0.781 s]
INFO] Moquette - Parser Commons .......................... SUCCESS [ 12.247 s]
INFO] Moquette - Netty Parser ............................ SUCCESS [ 9.518 s]
INFO] Moquette - broker .................................. FAILURE [01:11 min]
INFO] Moquette - distribution ............................ SKIPPED
INFO] Moquette - OSGi bundle ............................. SKIPPED
INFO] ------------------------------------------------------------------------
INFO] BUILD FAILURE
INFO] ------------------------------------------------------------------------
INFO] Total time: 01:34 min
INFO] Finished at: 2015-04-07T07:07:29-04:00
INFO] Final Memory: 16M/44M
INFO] ------------------------------------------------------------------------
ERROR] Failed to execute goal org.apache.maven.plugins:maven-surefire-plugin:2.
2.4:test (default-test) on project moquette-broker: There are test failures.
ERROR]
ERROR] Please refer to C:\Users\Brian\Documents\GitHub\moquette\broker\target\s
refire-reports for the individual test results.
ERROR] -> [Help 1]
ERROR]
ERROR] To see the full stack trace of the errors, re-run Maven with the -e swit
h.
ERROR] Re-run Maven using the -X switch to enable full debug logging.
ERROR]
ERROR] For more information about the errors and possible solutions, please rea
the following articles:
ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureExc
ption
ERROR]
ERROR] After correcting the problems, you can resume the build with the command
ERROR] mvn -rf :moquette-broker
cmd' is not recognized as an internal or external command,
perable program or batch file.
Other version code
why is not commit ?
I'm running: mvn clean install
Maven version: 3.3
JDL: OpenJDK 7
And the build process is always stuck at:
13:40:03,788 [pool-57-thread-1] ERROR SimpleMessaging onEvent 173 - Serious error processing the message org.eclipse.moquette.proto.messages.DisconnectMessage@5dd3c8ce for session [clientID: Publisher]org.eclipse.moquette.server.netty.NettyChannel@64953d6b
java.lang.RuntimeException: java.lang.reflect.InvocationTargetException
at org.eclipse.moquette.spi.impl.AnnotationSupport.dispatch(AnnotationSupport.java:66)
at org.eclipse.moquette.spi.impl.SimpleMessaging.onEvent(SimpleMessaging.java:167)
at org.eclipse.moquette.spi.impl.SimpleMessaging.onEvent(SimpleMessaging.java:57)
at com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:128)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.eclipse.moquette.spi.impl.AnnotationSupport.dispatch(AnnotationSupport.java:64)
... 6 more
Caused by: java.lang.RuntimeException: Could not instantiate class
at org.mapdb.SerializerPojo.deserializeUnknownHeader(SerializerPojo.java:526)
at org.mapdb.SerializerBase.deserialize(SerializerBase.java:1416)
at org.mapdb.SerializerBase.deserializeHashSet(SerializerBase.java:1678)
at org.mapdb.SerializerBase.access$600(SerializerBase.java:30)
at org.mapdb.SerializerBase$49.deserialize(SerializerBase.java:779)
at org.mapdb.SerializerBase.deserialize(SerializerBase.java:1414)
at org.mapdb.SerializerBase.deserialize(SerializerBase.java:1402)
at org.mapdb.HTreeMap$1.deserialize(HTreeMap.java:156)
at org.mapdb.HTreeMap$1.deserialize(HTreeMap.java:129)
at org.mapdb.Store.deserialize(Store.java:296)
at org.mapdb.StoreWAL.get2(StoreWAL.java:600)
at org.mapdb.Store.get(Store.java:146)
at org.mapdb.HTreeMap.removeInternal(HTreeMap.java:1055)
at org.mapdb.HTreeMap.remove(HTreeMap.java:1010)
at org.eclipse.moquette.spi.persistence.MapDBPersistentStore.wipeSubscriptions(MapDBPersistentStore.java:249)
at org.eclipse.moquette.spi.impl.ProtocolProcessor.cleanSession(ProtocolProcessor.java:280)
at org.eclipse.moquette.spi.impl.ProtocolProcessor.processDisconnect(ProtocolProcessor.java:530)
... 10 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: 2
at org.mapdb.SerializerPojo.deserializeUnknownHeader(SerializerPojo.java:492)
... 26 more
May 14, 2015 1:40:48 PM com.lmax.disruptor.FatalExceptionHandler handleEventException
SEVERE: Exception processing: 10 org.eclipse.moquette.spi.impl.ValueEvent@5454d285
java.lang.RuntimeException: Could not instantiate class
at org.mapdb.SerializerPojo.deserializeUnknownHeader(SerializerPojo.java:526)
at org.mapdb.SerializerBase.deserialize(SerializerBase.java:1416)
at org.mapdb.SerializerBase.deserializeHashSet(SerializerBase.java:1678)
at org.mapdb.SerializerBase.access$600(SerializerBase.java:30)
at org.mapdb.SerializerBase$49.deserialize(SerializerBase.java:779)
at org.mapdb.SerializerBase.deserialize(SerializerBase.java:1414)
at org.mapdb.SerializerBase.deserialize(SerializerBase.java:1402)
at org.mapdb.HTreeMap$1.deserialize(HTreeMap.java:156)
at org.mapdb.HTreeMap$1.deserialize(HTreeMap.java:129)
at org.mapdb.Store.deserialize(Store.java:296)
at org.mapdb.StoreWAL.get2(StoreWAL.java:600)
at org.mapdb.Store.get(Store.java:146)
at org.mapdb.HTreeMap.putInner(HTreeMap.java:893)
at org.mapdb.HTreeMap.put(HTreeMap.java:849)
at org.eclipse.moquette.spi.persistence.MapDBPersistentStore.updateSubscriptions(MapDBPersistentStore.java:255)
at org.eclipse.moquette.spi.impl.subscriptions.SubscriptionsStore.deactivate(SubscriptionsStore.java:180)
at org.eclipse.moquette.spi.impl.ProtocolProcessor.processConnectionLost(ProtocolProcessor.java:557)
at org.eclipse.moquette.spi.impl.SimpleMessaging.onEvent(SimpleMessaging.java:158)
at org.eclipse.moquette.spi.impl.SimpleMessaging.onEvent(SimpleMessaging.java:57)
at com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:128)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 2
at org.mapdb.SerializerPojo.deserializeUnknownHeader(SerializerPojo.java:492)
... 22 more
Exception in thread "pool-57-thread-1" java.lang.RuntimeException: java.lang.RuntimeException: Could not instantiate class
at com.lmax.disruptor.FatalExceptionHandler.handleEventException(FatalExceptionHandler.java:45)
at com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:147)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Could not instantiate class
at org.mapdb.SerializerPojo.deserializeUnknownHeader(SerializerPojo.java:526)
at org.mapdb.SerializerBase.deserialize(SerializerBase.java:1416)
at org.mapdb.SerializerBase.deserializeHashSet(SerializerBase.java:1678)
at org.mapdb.SerializerBase.access$600(SerializerBase.java:30)
at org.mapdb.SerializerBase$49.deserialize(SerializerBase.java:779)
at org.mapdb.SerializerBase.deserialize(SerializerBase.java:1414)
at org.mapdb.SerializerBase.deserialize(SerializerBase.java:1402)
at org.mapdb.HTreeMap$1.deserialize(HTreeMap.java:156)
at org.mapdb.HTreeMap$1.deserialize(HTreeMap.java:129)
at org.mapdb.Store.deserialize(Store.java:296)
at org.mapdb.StoreWAL.get2(StoreWAL.java:600)
at org.mapdb.Store.get(Store.java:146)
at org.mapdb.HTreeMap.putInner(HTreeMap.java:893)
at org.mapdb.HTreeMap.put(HTreeMap.java:849)
at org.eclipse.moquette.spi.persistence.MapDBPersistentStore.updateSubscriptions(MapDBPersistentStore.java:255)
at org.eclipse.moquette.spi.impl.subscriptions.SubscriptionsStore.deactivate(SubscriptionsStore.java:180)
at org.eclipse.moquette.spi.impl.ProtocolProcessor.processConnectionLost(ProtocolProcessor.java:557)
at org.eclipse.moquette.spi.impl.SimpleMessaging.onEvent(SimpleMessaging.java:158)
at org.eclipse.moquette.spi.impl.SimpleMessaging.onEvent(SimpleMessaging.java:57)
at com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:128)
... 3 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: 2
at org.mapdb.SerializerPojo.deserializeUnknownHeader(SerializerPojo.java:492)
... 22 more
Using moquette-bundle with equinox console, When a publisher/subscriber attempts to connect, there is an ClassNotFoundException on class org.eclipse.moquette.proto.messages.AbstractMessage$QOSType.
I'm using 0.7 version on Ubuntu 14.10
I' m trying to use Moquette bundle with equinox console. I didn't have any problems during the compiling phase. These are the bundles that i use:
id State Bundle
0 ACTIVE org.eclipse.osgi_3.10.1.v20140909-1633
1 ACTIVE org.eclipse.equinox.console_1.1.0.v20140131-1639
2 ACTIVE org.apache.felix.gogo.command_0.10.0.v201209301215
3 ACTIVE org.apache.felix.gogo.runtime_0.10.0.v201209301036
4 ACTIVE org.apache.felix.gogo.shell_0.10.0.v201212101605
5 ACTIVE io.netty.buffer_4.0.24.Final
6 ACTIVE io.netty.codec_4.0.24.Final
7 ACTIVE io.netty.codec-http_4.0.24.Final
8 ACTIVE io.netty.common_4.0.24.Final
9 ACTIVE io.netty.handler_4.0.24.Final
10 ACTIVE io.netty.transport_4.0.24.Final
11 ACTIVE org.apache.felix.javax.servlet_1.0.0
12 ACTIVE slf4j.api_1.7.10
Fragments=13
13 RESOLVED slf4j.log4j12_1.7.10
Master=12
14 ACTIVE log4j_1.2.17
15 ACTIVE org.mapdb.mapdb_1.1.0.SNAPSHOT
16 ACTIVE org.eclipse.moquette.bundle_0.7.0.SNAPSHOT
I attach the start up script and the moquette.log.
esults :
ailed tests: testCleanSession_maintainClientSubscriptions_againstClientDestru
tion(org.eclipse.moquette.server.ServerIntegrationPahoTest)
testCleanSession_maintainClientSubscriptions_withServerRestart(org.eclipse.moq
ette.server.ServerIntegrationPahoTest)
checkReceivePublishedMessage_after_a_reconnect_with_notCleanSession(org.eclips
.moquette.server.ServerIntegrationPahoTest)
avoidMultipleNotificationsAfterMultipleReconnection_cleanSessionFalseQoS1(org.
clipse.moquette.server.ServerIntegrationPahoTest)
testCleanSession_maintainClientSubscriptions(org.eclipse.moquette.server.Serve
IntegrationPahoTest)
testCleanSession_correctlyClientSubscriptions(org.eclipse.moquette.server.Serv
rIntegrationPahoTest)
testSubscribe(org.eclipse.moquette.server.ServerIntegrationPahoTest)
testPublishReceiveWithQoS2(org.eclipse.moquette.server.ServerIntegrationPahoTe
t)
testPublishWithQoS1(org.eclipse.moquette.server.ServerIntegrationPahoTest)
testPublishWithQoS2(org.eclipse.moquette.server.ServerIntegrationPahoTest): ex
ected:<2> but was:<1>
testPublishWithQoS2(org.eclipse.moquette.server.ServerIntegrationPahoTest)
testPublishWithQoS1_notCleanSession(org.eclipse.moquette.server.ServerIntegrat
onPahoTest)
testRetain_maintainMessage_againstClientDestruction(org.eclipse.moquette.serve
.ServerIntegrationPahoTest)
testUnsubscribe_do_not_notify_anymore_same_session(org.eclipse.moquette.server
ServerIntegrationPahoTest)
testUnsubscribe_do_not_notify_anymore_new_session(org.eclipse.moquette.server.
erverIntegrationPahoTest)
checkSubscriberQoS1ReceiveQoS0publishes(org.eclipse.moquette.server.ServerInte
rationQoSValidationTest): expected:<[Hello world MQTT QoS0]> but was:<[Test my
ayload]>
checkSubscriberQoS0ReceiveQoS0publishes(org.eclipse.moquette.server.ServerInte
rationQoSValidationTest): expected:<[Hello world MQTT QoS0]> but was:<[Test my
ayload]>
checkSubscriberQoS0ReceiveQoS2publishes_downgrade(org.eclipse.moquette.server.
erverIntegrationQoSValidationTest): expected:<[Hello world MQTT QoS2]> but was:
[Test my payload]>
checkSubscriberQoS2ReceiveQoS0publishes(org.eclipse.moquette.server.ServerInte
rationQoSValidationTest): expected:<[Hello world MQTT QoS2]> but was:<[Test my
ayload]>
checkSubscriberQoS2ReceiveQoS2publishes(org.eclipse.moquette.server.ServerInte
rationQoSValidationTest): expected:<[Hello world MQTT QoS2]> but was:<[Test my
ayload]>
checkSupportSSL(org.eclipse.moquette.server.ServerIntegrationSSLTest)
checkRestartCleanSubscriptionTree(org.eclipse.moquette.server.ServerRestartInt
grationTest): expected:<[Hello world MQTT!!]> but was:<[Test my payload]>
checkDontPublishInactiveClientsAfterServerRestart(org.eclipse.moquette.server.
erverRestartIntegrationTest): expected:<[Hello world MQTT!!]> but was:<[Test my
payload]>
overridingSubscriptions(org.eclipse.moquette.spi.impl.MapDBPersistentStoreTest
: The DB storagefile C:\Users\Brian\moquette_store.mapdb already exists
overridingSubscriptions(org.eclipse.moquette.spi.impl.MapDBPersistentStoreTest
: Error deleting the moquette db file C:\Users\Brian\moquette_store.mapdb
ests run: 88, Failures: 25, Errors: 0, Skipped: 0
INFO] ------------------------------------------------------------------------
INFO] Reactor Summary:
INFO]
INFO] Moquette MQTT parent ............................... SUCCESS [ 0.781 s]
INFO] Moquette - Parser Commons .......................... SUCCESS [ 12.247 s]
INFO] Moquette - Netty Parser ............................ SUCCESS [ 9.518 s]
INFO] Moquette - broker .................................. FAILURE [01:11 min]
INFO] Moquette - distribution ............................ SKIPPED
INFO] Moquette - OSGi bundle ............................. SKIPPED
INFO] ------------------------------------------------------------------------
INFO] BUILD FAILURE
INFO] ------------------------------------------------------------------------
INFO] Total time: 01:34 min
INFO] Finished at: 2015-04-07T07:07:29-04:00
INFO] Final Memory: 16M/44M
INFO] ------------------------------------------------------------------------
ERROR] Failed to execute goal org.apache.maven.plugins:maven-surefire-plugin:2.
2.4:test (default-test) on project moquette-broker: There are test failures.
ERROR]
ERROR] Please refer to C:\Users\Brian\Documents\GitHub\moquette\broker\target\s
refire-reports for the individual test results.
ERROR] -> [Help 1]
ERROR]
ERROR] To see the full stack trace of the errors, re-run Maven with the -e swit
h.
ERROR] Re-run Maven using the -X switch to enable full debug logging.
ERROR]
ERROR] For more information about the errors and possible solutions, please rea
the following articles:
ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureExc
ption
ERROR]
ERROR] After correcting the problems, you can resume the build with the command
ERROR] mvn -rf :moquette-broker
cmd' is not recognized as an internal or external command,
perable program or batch file.
Hi,
I clone the latest src code and run "mvn clean package".It gives me following build failure error. Any ideas?
Results :
Failed tests: testCleanSession_maintainClientSubscriptions(org.eclipse.moquette.server.ServerIntegrationPahoTest): The DB storagefile /home/aakash/test/java/moquette/broker/moquette_store.mapdb already exists
testCleanSession_maintainClientSubscriptions_againstClientDestruction(org.eclipse.moquette.server.ServerIntegrationPahoTest): The DB storagefile /home/aakash/test/java/moquette/broker/moquette_store.mapdb already exists
testCleanSession_correctlyClientSubscriptions(org.eclipse.moquette.server.ServerIntegrationPahoTest): The DB storagefile /home/aakash/test/java/moquette/broker/moquette_store.mapdb already exists
testCleanSession_maintainClientSubscriptions_withServerRestart(org.eclipse.moquette.server.ServerIntegrationPahoTest): The DB storagefile /home/aakash/test/java/moquette/broker/moquette_store.mapdb already exists
testRetain_maintainMessage_againstClientDestruction(org.eclipse.moquette.server.ServerIntegrationPahoTest): The DB storagefile /home/aakash/test/java/moquette/broker/moquette_store.mapdb already exists
testUnsubscribe_do_not_notify_anymore_same_session(org.eclipse.moquette.server.ServerIntegrationPahoTest): The DB storagefile /home/aakash/test/java/moquette/broker/moquette_store.mapdb already exists
testUnsubscribe_do_not_notify_anymore_new_session(org.eclipse.moquette.server.ServerIntegrationPahoTest): The DB storagefile /home/aakash/test/java/moquette/broker/moquette_store.mapdb already exists
testPublishWithQoS1(org.eclipse.moquette.server.ServerIntegrationPahoTest): The DB storagefile /home/aakash/test/java/moquette/broker/moquette_store.mapdb already exists
testPublishWithQoS1_notCleanSession(org.eclipse.moquette.server.ServerIntegrationPahoTest): The DB storagefile /home/aakash/test/java/moquette/broker/moquette_store.mapdb already exists
checkReceivePublishedMessage_after_a_reconnect_with_notCleanSession(org.eclipse.moquette.server.ServerIntegrationPahoTest): The DB storagefile /home/aakash/test/java/moquette/broker/moquette_store.mapdb already exists
testPublishWithQoS2(org.eclipse.moquette.server.ServerIntegrationPahoTest): The DB storagefile /home/aakash/test/java/moquette/broker/moquette_store.mapdb already exists
testPublishReceiveWithQoS2(org.eclipse.moquette.server.ServerIntegrationPahoTest): The DB storagefile /home/aakash/test/java/moquette/broker/moquette_store.mapdb already exists
avoidMultipleNotificationsAfterMultipleReconnection_cleanSessionFalseQoS1(org.eclipse.moquette.server.ServerIntegrationPahoTest): The DB storagefile /home/aakash/test/java/moquette/broker/moquette_store.mapdb already exists
checkSupportSSL(org.eclipse.moquette.server.ServerIntegrationSSLTest): The DB storagefile /home/aakash/test/java/moquette/broker/moquette_store.mapdb already exists
Tests in error:
checkPlainConnect(org.eclipse.moquette.server.ServerIntegrationWebSocketTest): Address already in use
checkPlainConnect(org.eclipse.moquette.server.ServerIntegrationWebSocketTest)
checkDontPublishInactiveClientsAfterServerRestart(org.eclipse.moquette.server.ServerRestartIntegrationTest): Address already in use
checkRestartCleanSubscriptionTree(org.eclipse.moquette.server.ServerRestartIntegrationTest): Address already in use
testSubscribe(org.eclipse.moquette.server.ServerIntegrationPahoTest): Address already in use
testSubscribe(org.eclipse.moquette.server.ServerIntegrationPahoTest)
testCleanSession_maintainClientSubscriptions(org.eclipse.moquette.server.ServerIntegrationPahoTest)
testCleanSession_maintainClientSubscriptions_againstClientDestruction(org.eclipse.moquette.server.ServerIntegrationPahoTest)
testCleanSession_correctlyClientSubscriptions(org.eclipse.moquette.server.ServerIntegrationPahoTest)
testCleanSession_maintainClientSubscriptions_withServerRestart(org.eclipse.moquette.server.ServerIntegrationPahoTest)
testRetain_maintainMessage_againstClientDestruction(org.eclipse.moquette.server.ServerIntegrationPahoTest)
testUnsubscribe_do_not_notify_anymore_same_session(org.eclipse.moquette.server.ServerIntegrationPahoTest)
testUnsubscribe_do_not_notify_anymore_new_session(org.eclipse.moquette.server.ServerIntegrationPahoTest)
testPublishWithQoS1(org.eclipse.moquette.server.ServerIntegrationPahoTest)
testPublishWithQoS1_notCleanSession(org.eclipse.moquette.server.ServerIntegrationPahoTest)
checkReceivePublishedMessage_after_a_reconnect_with_notCleanSession(org.eclipse.moquette.server.ServerIntegrationPahoTest)
testPublishWithQoS2(org.eclipse.moquette.server.ServerIntegrationPahoTest)
testPublishReceiveWithQoS2(org.eclipse.moquette.server.ServerIntegrationPahoTest)
avoidMultipleNotificationsAfterMultipleReconnection_cleanSessionFalseQoS1(org.eclipse.moquette.server.ServerIntegrationPahoTest)
connectWithCredentials(org.eclipse.moquette.server.ServerIntegrationFuseTest): Address already in use
checkReplayofStoredPublishResumeAfter_a_disconnect_cleanSessionFalseQoS1(org.eclipse.moquette.server.ServerIntegrationFuseTest): Address already in use
checkReplayStoredPublish_forNoCleanSession_qos1(org.eclipse.moquette.server.ServerIntegrationFuseTest): Address already in use
checkWillTestmaentIsPublishedOnConnectionKill_noRetain(org.eclipse.moquette.server.ServerIntegrationFuseTest): Address already in use
checkQoS2SuscriberDisconnectReceivePersistedPublishes(org.eclipse.moquette.server.ServerIntegrationFuseTest): Address already in use
checkSubscriberQoS0ReceiveQoS0publishes(org.eclipse.moquette.server.ServerIntegrationQoSValidationTest): Address already in use
checkSubscriberQoS0ReceiveQoS0publishes(org.eclipse.moquette.server.ServerIntegrationQoSValidationTest)
checkSubscriberQoS0ReceiveQoS1publishes_downgrade(org.eclipse.moquette.server.ServerIntegrationQoSValidationTest): Address already in use
checkSubscriberQoS0ReceiveQoS1publishes_downgrade(org.eclipse.moquette.server.ServerIntegrationQoSValidationTest)
checkSubscriberQoS0ReceiveQoS2publishes_downgrade(org.eclipse.moquette.server.ServerIntegrationQoSValidationTest): Address already in use
checkSubscriberQoS0ReceiveQoS2publishes_downgrade(org.eclipse.moquette.server.ServerIntegrationQoSValidationTest)
checkSubscriberQoS1ReceiveQoS0publishes(org.eclipse.moquette.server.ServerIntegrationQoSValidationTest): Address already in use
checkSubscriberQoS1ReceiveQoS0publishes(org.eclipse.moquette.server.ServerIntegrationQoSValidationTest)
checkSubscriberQoS1ReceiveQoS1publishes(org.eclipse.moquette.server.ServerIntegrationQoSValidationTest): Address already in use
checkSubscriberQoS1ReceiveQoS1publishes(org.eclipse.moquette.server.ServerIntegrationQoSValidationTest)
checkSubscriberQoS1ReceiveQoS2publishes_downgrade(org.eclipse.moquette.server.ServerIntegrationQoSValidationTest): Address already in use
checkSubscriberQoS1ReceiveQoS2publishes_downgrade(org.eclipse.moquette.server.ServerIntegrationQoSValidationTest)
checkSubscriberQoS2ReceiveQoS0publishes(org.eclipse.moquette.server.ServerIntegrationQoSValidationTest): Address already in use
checkSubscriberQoS2ReceiveQoS0publishes(org.eclipse.moquette.server.ServerIntegrationQoSValidationTest)
checkSubscriberQoS2ReceiveQoS1publishes(org.eclipse.moquette.server.ServerIntegrationQoSValidationTest): Address already in use
checkSubscriberQoS2ReceiveQoS1publishes(org.eclipse.moquette.server.ServerIntegrationQoSValidationTest)
checkSubscriberQoS2ReceiveQoS2publishes(org.eclipse.moquette.server.ServerIntegrationQoSValidationTest): Address already in use
checkSubscriberQoS2ReceiveQoS2publishes(org.eclipse.moquette.server.ServerIntegrationQoSValidationTest)
Tests run: 133, Failures: 14, Errors: 42, Skipped: 0
When a runtime-exception occurred, the broker goes in hang and no more messages are handled. Only the restart of the broker seems to be effective. Is it possible to do a better exception handling?
requester: diogovis
original: https://code.google.com/p/moquette-mqtt/issues/detail?id=55
It is possible to scan for opened brokers?
If MQTT client simply send a SUBSCRIBE request with invalid topic string(like this one: #MQTTClient
).
Then the moquette
server will just throw a exception on console/log, without close client's connection and rollback internal status.
Note: we cannot assume client will validate topic string before send to server. if client don't do it, the server's internal status will be incorrect.
org.eclipse.moquette.server.netty.metrics.MessageMetricsCollector
class contains a memory queue called allMetrics
, it counting message read & write on all channel.
However, it only add MessageMetrics
object once channel get closed, the problem here is there are no other way to remove MessageMetrics
object. and it will run out of memory someday.
Hi,
my problem is about sending the will message. When I simulate a lost connection, I have the following error:
2015-02-19 20:15:15,476 INFO ProtocolProcessor - Lost connection with client <0.0.1>
2015-02-19 20:15:15,477 INFO ProtocolProcessor - Publish received from clientID <0.0.1> on topic <topic/diagnosticServer> with QoS EXACTLY_ONCE
feb 19, 2015 8:15:15 PM com.lmax.disruptor.FatalExceptionHandler handleEventException
GRAVE: Exception processing: 5 org.eclipse.moquette.spi.impl.ValueEvent@149e8424
java.lang.NullPointerException
at org.eclipse.moquette.spi.impl.events.PublishEvent.(PublishEvent.java:44)
at org.eclipse.moquette.spi.impl.ProtocolProcessor.processPublish(ProtocolProcessor.java:301)
at org.eclipse.moquette.spi.impl.ProtocolProcessor.processPublish(ProtocolProcessor.java:339)
at org.eclipse.moquette.spi.impl.ProtocolProcessor.processConnectionLost(ProtocolProcessor.java:551)
at org.eclipse.moquette.spi.impl.SimpleMessaging.onEvent(SimpleMessaging.java:146)
at org.eclipse.moquette.spi.impl.SimpleMessaging.onEvent(SimpleMessaging.java:50)
at com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:128)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Exception in thread "pool-1-thread-1" java.lang.RuntimeException: java.lang.NullPointerException
at com.lmax.disruptor.FatalExceptionHandler.handleEventException(FatalExceptionHandler.java:45)
at com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:147)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at org.eclipse.moquette.spi.impl.events.PublishEvent.(PublishEvent.java:44)
at org.eclipse.moquette.spi.impl.ProtocolProcessor.processPublish(ProtocolProcessor.java:301)
at org.eclipse.moquette.spi.impl.ProtocolProcessor.processPublish(ProtocolProcessor.java:339)
at org.eclipse.moquette.spi.impl.ProtocolProcessor.processConnectionLost(ProtocolProcessor.java:551)
at org.eclipse.moquette.spi.impl.SimpleMessaging.onEvent(SimpleMessaging.java:146)
at org.eclipse.moquette.spi.impl.SimpleMessaging.onEvent(SimpleMessaging.java:50)
at com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:128)
... 3 more
This is the client side code:
client = new MqttClient(broker, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
connOpts.setWill(dgnServerTopic, my_message.getBytes(), 2, true);
client.setCallback(this);
client.connect(connOpts);
The Subscription tree using following code to check whether client are subscribe to same topic with different QOS:
// org.eclipse.moquette.spi.impl.subscriptions.TreeNode.java
void addSubscription(Subscription s) {
//avoid double registering for same clientID, topic and QoS
if (m_subscriptions.contains(s)) {
return;
}
//remove existing subscription for same client and topic but different QoS
...
}
the problem here: if client send SUBSCRIBE with same clientID, topic and QoS, but different cleanSession Flag(Subscription.m_cleanSession
). the code will ignore changed cleanSession flag.
Requesting secures web sockets support ..wss
the org.eclipse.moquette.server.netty.NettyMQTTHandler
class contains a HashMap field(m_channelMapper
):
public class NettyMQTTHandler extends ChannelInboundHandlerAdapter{
private final Map<ChannelHandlerContext, NettyChannel> m_channelMapper = new HashMap<ChannelHandlerContext, NettyChannel>();
}
which seems used to maintain mapping relation between netty's channel and moquette's ServerChannel.
however, when running on heavy-loaded server, this HashMap
will only add more data without remove anything. and finally run out of memory.
The problems that i found:
m_channelMapper
using ChannelHandlerContext
as key, however, netty's ChannelHandlerContext
didn't implement hashcode & equals:if (!m_channelMapper.containsKey(ctx)) {
m_channelMapper.put(ctx, new NettyChannel(ctx));
}
DISCONNECT
event, only ProtocolProcessor
process the event, and the m_channelMapper
will not remove associated value(NettyChannel).ChannelHandler.exceptionCaught
method will be called, but NettyMQTTHandler
didn't implements such method and clean associated value(NettyChannel).m_channelMapper
, and every channel contains quite a lot data(such as pooled byte buffer, channel pipelines...).some fix suggestions:
public class NettyMQTTHandler extends ChannelInboundHandlerAdapter
{
private final ChannelFutureListener remover = future -> removeSession(future.channel());
private final Map<ChannelId, NettyChannel> m_channelMapper = new HashMap<ChannelId, NettyChannel>();
public void channelRead(ChannelHandlerContext ctx, Object message) {
// ....
Channel channel = ctx.channel();
NettyChannel protocolChannel = sessions.computeIfAbsent(channel.id(), key -> {
// register close hook, when ProtocolProcessor close the channel, we remove associated value.
channel.closeFuture().addListener(remover);
return new NettyChannel(ctx);
});
messageDispatcher.handleProtocolMessage(protocolChannel, msg);
}
private void removeSession(Channel channel){
if(sessions.remove(channel.id()) != null){
channel.closeFuture().removeListener(remover);
}
}
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
// always try to notify connection lost, because physics connection may be already closed.
// scenario:
// 1.client send CONNECT message and terminate immediately(CONNECT message didn't get processed).
// 2.ProtocolProcessor process CONNECT message with a dead nettyChannel(already removed from sessions).
// 3.The timer trigger this method, and we try to notify this event if possible.
//
if(ctx.hasAttr(NettyChannel.ATTR_KEY_CLIENTID)){
String clientID = (String) ctx.attr(NettyChannel.ATTR_KEY_CLIENTID).get();
if(clientID != null && ! clientID.isEmpty()){
// NOTE: this requires us compare NettyChannel using equals rather than reference(A == B).
messageDispatcher.lostConnection(new NettyChannel(ctx), clientID);
}
// otherwise, this channel already get properly clean?
}
Channel channel = ctx.channel();
NettyChannel protocolChannel = sessions.get(channel.id());
if(protocolChannel == null){
ctx.close();
return;
}
removeSession(channel);
ctx.close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Channel channel = ctx.channel();
NettyChannel protocolChannel = sessions.get(channel.id());
// protocolChannel == null? this may happens if client didn't write anything(channelRead).
if(protocolChannel != null){
String clientID = (String) protocolChannel.getAttribute(NettyChannel.ATTR_KEY_CLIENTID);
// clientID may be null if client send CONNECT message and terminate this connection immediately(CONNECT message didn't get processed).
// in this case, we only hope the CONNECT message get processed properly and setup timer(call channelInactive) to clean resource.
// otherwise, we propagate event and clean resource immediately.
if(clientID != null && ! clientID.isEmpty()){
messageDispatcher.lostConnection(protocolChannel, clientID);
}
removeSession(channel);
}
// Close the connection explicitly just in case the transport
// did not close the connection automatically.
if (channel.isActive()) {
ctx.close();
}
// we are last handler, no need to propagate exception.
// super.exceptionCaught(ctx, cause);
}
}
public class NettyChannel {
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof NettyChannel)) return false;
NettyChannel that = (NettyChannel) o;
return !(m_channel != null ? !m_channel.channel().id().equals(that.m_channel.channel().id()) : that.m_channel != null);
}
}
The Subscription tree using following code to check whether client are subscribe to same topic with different QOS:
// org.eclipse.moquette.spi.impl.subscriptions.TreeNode.java
void addSubscription(Subscription s) {
//avoid double registering for same clientID, topic and QoS
if (m_subscriptions.contains(s)) {
return;
}
//remove existing subscription for same client and topic but different QoS
...
}
the problem here: if client send SUBSCRIBE with same clientID, topic and QoS, but different cleanSession Flag(Subscription.m_cleanSession
). the code will ignore changed cleanSession flag.
and when client publish message, the server will never be able to store QoS 1 message if client change it's cleanSession flag from true to false(via reconnect).
// this check will never be true if client first connect with cleanSession = true, and subscribe
// something, then client crash, and reconnect via cleanSession = false, subscribe same topics
// this time, the `subscriber.isCleanSession()` always return true.
if (!subscriber.isCleanSession() && !subscriber.isActive()) {
//clone the event with matching clientID and store it for future publish.
messagesStore.storePublishForFuture(new PublishEvent(topic, publishQos, message, retain, subscriberClientId, publishEventMsgID));
// ...
}
Please Change the limitation of clientId length from 23 to more value for productivity.
i know this is an standard for low constrained devices but it is a real limitation when using for device identification.
i think 1024 is better
Created at:
io.netty.buffer.UnpooledByteBufAllocator.newDirectBuffer(UnpooledByteBufAllocator.java:55)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:155)
io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:146)
io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:83)
org.dna.mqtt.moquette.parser.netty.PublishEncoder.encode(PublishEncoder.java:53)
org.dna.mqtt.moquette.parser.netty.PublishEncoder.encode(PublishEncoder.java:27)
org.dna.mqtt.moquette.parser.netty.MQTTEncoder.encode(MQTTEncoder.java:57)
org.dna.mqtt.moquette.parser.netty.MQTTEncoder.encode(MQTTEncoder.java:30)
io.netty.handler.codec.MessageToByteEncoder.write(MessageToByteEncoder.java:111)
io.netty.channel.DefaultChannelHandlerContext.invokeWrite(DefaultChannelHandlerContext.java:666)
io.netty.channel.DefaultChannelHandlerContext.write(DefaultChannelHandlerContext.java:724)
io.netty.channel.DefaultChannelHandlerContext.write(DefaultChannelHandlerContext.java:659)
org.dna.mqtt.moquette.server.netty.metrics.MessageMetricsHandler.write(MessageMetricsHandler.java:53)
io.netty.channel.DefaultChannelHandlerContext.invokeWrite(DefaultChannelHandlerContext.java:666)
io.netty.channel.DefaultChannelHandlerContext.access$2000(DefaultChannelHandlerContext.java:30)
io.netty.channel.DefaultChannelHandlerContext$AbstractWriteTask.write(DefaultChannelHandlerContext.java:945)
io.netty.channel.DefaultChannelHandlerContext$WriteAndFlushTask.write(DefaultChannelHandlerContext.java:999)
io.netty.channel.DefaultChannelHandlerContext$AbstractWriteTask.run(DefaultChannelHandlerContext.java:934)
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:370)
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:353)
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
java.lang.Thread.run(Thread.java:745)
when I using the moquette 0.6 as the embeded mqtt server, One exception as above was caught, Is this normal?
It ist possible to get the List of the current connecting clients?
Hi
I'm not sure if it is okay to leave a question in this Issue page, but here it goes.
I am wondering if moquette is available to be clustered.
Thank you. Apologies if questions weren't appropriate in this board
Support
No heartbeat support , i tried with this mqtt android library https://github.com/chinesejie/paho-for-android and it keeps breaking the code.
First of all, I English level is not very good, please forgive me!In the process I use the moquette, I think I found several problems.
In the org.eclipse.moquette.spi.impl.ProtocolProcessor class, there is a sentence:
If (m_clientIDs.get (clientId) = = null) {
Throw new RuntimeException (String.format ("Can't find a ConnectionDescriptor for client <%s> in cache <%s>", clientId, m_clientIDs));
}
This will cause the publisher obstruction.
Also, When broker announced the news to client, if client is persistent, suddenly closed the connection or throws an exception, this time is no receipt to broker has been receiving complete. Broker will never again release relevant information to the client.
Reporter: bppause
Original: https://code.google.com/p/moquette-mqtt/issues/detail?id=53
I'm using 0.6 versión with index.html you provided, and using mqttws31.js downloaded from http://www.eclipse.org/paho/clients/js/, and when trying to hit connect button, i get next error message on server:
18:35:27,746 DEBUG io.netty.util.ResourceLeakDetector:76 - -Dio.netty.noResourceLeakDetection: false
18:35:27,783 DEBUG io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker:71 - [id: 0xc41b4658, /127.0.0.1:2410 => /127.0.0.1:8080] WS Version V13 server handshake
18:35:27,786 DEBUG io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker:71 - WS Version 13 Server Handshake key: 2WP6OSP2ldJdlDY5NPZxBw==. Response: pzANkZagf3dP0kQ33Jq8676lnQI=.
18:35:27,789 WARN io.netty.channel.DefaultChannelPipeline:151 - An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
java.lang.NullPointerException
at org.dna.mqtt.moquette.server.netty.NettyMQTTHandler.channelInactive(NettyMQTTHandler.java:81)
at io.netty.channel.DefaultChannelHandlerContext.invokeChannelInactive(DefaultChannelHandlerContext.java:232)
at io.netty.channel.DefaultChannelHandlerContext.fireChannelInactive(DefaultChannelHandlerContext.java:218)
at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
at io.netty.channel.DefaultChannelHandlerContext.invokeChannelInactive(DefaultChannelHandlerContext.java:232)
at io.netty.channel.DefaultChannelHandlerContext.fireChannelInactive(DefaultChannelHandlerContext.java:218)
at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:214)
at io.netty.channel.DefaultChannelHandlerContext.invokeChannelInactive(DefaultChannelHandlerContext.java:232)
at io.netty.channel.DefaultChannelHandlerContext.fireChannelInactive(DefaultChannelHandlerContext.java:218)
at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
at io.netty.channel.DefaultChannelHandlerContext.invokeChannelInactive(DefaultChannelHandlerContext.java:232)
at io.netty.channel.DefaultChannelHandlerContext.fireChannelInactive(DefaultChannelHandlerContext.java:218)
at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
at io.netty.channel.DefaultChannelHandlerContext.invokeChannelInactive(DefaultChannelHandlerContext.java:232)
at io.netty.channel.DefaultChannelHandlerContext.fireChannelInactive(DefaultChannelHandlerContext.java:218)
at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
at io.netty.channel.DefaultChannelHandlerContext.invokeChannelInactive(DefaultChannelHandlerContext.java:232)
at io.netty.channel.DefaultChannelHandlerContext.fireChannelInactive(DefaultChannelHandlerContext.java:218)
at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
at io.netty.handler.codec.http.HttpObjectAggregator.channelInactive(HttpObjectAggregator.java:219)
at io.netty.channel.DefaultChannelHandlerContext.invokeChannelInactive(DefaultChannelHandlerContext.java:232)
at io.netty.channel.DefaultChannelHandlerContext.fireChannelInactive(DefaultChannelHandlerContext.java:218)
at io.netty.handler.codec.ReplayingDecoder.channelInactive(ReplayingDecoder.java:347)
at io.netty.channel.DefaultChannelHandlerContext.invokeChannelInactive(DefaultChannelHandlerContext.java:232)
at io.netty.channel.DefaultChannelHandlerContext.fireChannelInactive(DefaultChannelHandlerContext.java:218)
at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:767)
at io.netty.channel.AbstractChannel$AbstractUnsafe$5.run(AbstractChannel.java:558)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:354)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:348)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:101)
at java.lang.Thread.run(Thread.java:662)
18:35:28,098 DEBUG io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker:71 - [id: 0x03e036d5, /127.0.0.1:2412 => /127.0.0.1:8080] WS Version V13 server handshake
18:35:28,099 DEBUG io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker:71 - WS Version 13 Server Handshake key: SCvw+oSgc9B2l6x3dMc0SA==. Response: lF1UZN3/15mn05D1WxND16bjzsw=.
18:35:28,105 DEBUG io.netty.handler.codec.http.websocketx.WebSocket08FrameDecoder:76 - Decoding WebSocket Frame opCode=2
18:35:28,106 DEBUG io.netty.handler.codec.http.websocketx.WebSocket08FrameDecoder:76 - Decoding WebSocket Frame length=24
18:35:28,108 WARN io.netty.channel.DefaultChannelPipeline:151 - An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
io.netty.handler.codec.DecoderException: io.netty.util.IllegalReferenceCountException: refCnt: 0
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:263)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:131)
at io.netty.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:332)
at io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:318)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:332)
at io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:318)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:332)
at io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:318)
at io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler$1.channelRead(WebSocketServerProtocolHandler.java:133)
at io.netty.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:332)
at io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:318)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:173)
at io.netty.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:332)
at io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:318)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:785)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:100)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:478)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:447)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:341)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:101)
at java.lang.Thread.run(Thread.java:662)
Caused by: io.netty.util.IllegalReferenceCountException: refCnt: 0
at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1171)
at io.netty.buffer.AbstractByteBuf.checkIndex(AbstractByteBuf.java:1117)
at io.netty.buffer.AbstractByteBuf.getByte(AbstractByteBuf.java:330)
at io.netty.buffer.AbstractByteBuf.readByte(AbstractByteBuf.java:563)
at org.dna.mqtt.moquette.parser.netty.Utils.decodeRemainingLenght(Utils.java:75)
at org.dna.mqtt.moquette.parser.netty.Utils.checkHeaderAvailability(Utils.java:47)
at org.dna.mqtt.moquette.parser.netty.MQTTDecoder.decode(MQTTDecoder.java:55)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:232)
... 22 more
Hello, we are having the following issue after publishing a large message to the broker. The message in question is the application log and can have 1MB.
Note that the message is published and received by the subscriber clients, but after the following error, we are unable to publish more messages, only after restarting the broker.
Any further help feel free to ask.
Serious error processing the message org.eclipse.moquette.proto.messages.PublishMessage@1731782d for session [clientID: spin-device-16]org.eclipse.moquette.server.netty.NettyChannel@6c2bd990
java.lang.RuntimeException: java.lang.reflect.InvocationTargetException
at org.eclipse.moquette.spi.impl.AnnotationSupport.dispatch(AnnotationSupport.java:66) ~[moquette-broker-0.7-SNAPSHOT.jar:na]
at org.eclipse.moquette.spi.impl.SimpleMessaging.onEvent(SimpleMessaging.java:167) [moquette-broker-0.7-SNAPSHOT.jar:na]
at org.eclipse.moquette.spi.impl.SimpleMessaging.onEvent(SimpleMessaging.java:57) [moquette-broker-0.7-SNAPSHOT.jar:na]
at com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:128) [disruptor-3.3.2.jar:na]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_71]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_71]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_71]
Caused by: java.lang.reflect.InvocationTargetException: null
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.7.0_71]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) ~[na:1.7.0_71]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.7.0_71]
at java.lang.reflect.Method.invoke(Method.java:606) ~[na:1.7.0_71]
at org.eclipse.moquette.spi.impl.AnnotationSupport.dispatch(AnnotationSupport.java:64) ~[moquette-broker-0.7-SNAPSHOT.jar:na]
... 6 common frames omitted
Caused by: org.mapdb.DBException$VolumeIOError: IO failed
at org.mapdb.Volume$RandomAccessFileVol.getLong(Volume.java:1896) ~[mapdb-2.0.0-SNAPSHOT.jar:na]
at org.mapdb.Volume$ReadOnly.getLong(Volume.java:1672) ~[mapdb-2.0.0-SNAPSHOT.jar:na]
at org.mapdb.StoreDirect.offsetsGet(StoreDirect.java:357) ~[mapdb-2.0.0-SNAPSHOT.jar:na]
at org.mapdb.StoreDirect.delete2(StoreDirect.java:413) ~[mapdb-2.0.0-SNAPSHOT.jar:na]
at org.mapdb.StoreCached.flushWriteCacheSegment(StoreCached.java:349) ~[mapdb-2.0.0-SNAPSHOT.jar:na]
at org.mapdb.StoreWAL.commit(StoreWAL.java:760) ~[mapdb-2.0.0-SNAPSHOT.jar:na]
at org.mapdb.DB.commit(DB.java:2227) ~[mapdb-2.0.0-SNAPSHOT.jar:na]
at org.eclipse.moquette.spi.persistence.MapDBPersistentStore.cleanInFlight(MapDBPersistentStore.java:190) ~[moquette-broker-0.7-SNAPSHOT.jar:na]
at org.eclipse.moquette.spi.impl.ProtocolProcessor.processPublish(ProtocolProcessor.java:309) ~[moquette-broker-0.7-SNAPSHOT.jar:na]
at org.eclipse.moquette.spi.impl.ProtocolProcessor.processPublish(ProtocolProcessor.java:295) ~[moquette-broker-0.7-SNAPSHOT.jar:na]
... 11 common frames omitted
Caused by: java.io.EOFException: null
at java.io.RandomAccessFile.readInt(RandomAccessFile.java:827) ~[na:1.7.0_71]
at java.io.RandomAccessFile.readLong(RandomAccessFile.java:860) ~[na:1.7.0_71]
at org.mapdb.Volume$RandomAccessFileVol.getLong(Volume.java:1894) ~[mapdb-2.0.0-SNAPSHOT.jar:na]
... 20 common frames omitted
Do you think it would be ok to use system properties for the values of the properties in the configuration file?
We could extend the parse method of the ConfigurationParser to replace system properties in the value (using: https://commons.apache.org/proper/commons-lang/javadocs/api-2.6/org/apache/commons/lang/text/StrSubstitutor.html).
If someone would configure the the moquette database path in the configuration file and does not want to use absolute path he could use e.g. the value "${moquette.path}/storage/moquette_store.mapdb"
import org.dna.mqtt.moquette.messaging.spi.IMatchingCondition;
import org.dna.mqtt.moquette.messaging.spi.impl.HawtDBPersistentStore.StoredMessage;
import org.dna.mqtt.moquette.messaging.spi.impl.events.PublishEvent;
import org.dna.mqtt.moquette.proto.messages.AbstractMessage.QOSType;
public interface IMessagesStore {
void initStore();
void storeRetained(String var1, ByteBuffer var2, QOSType var3);
Collection<StoredMessage> searchMatching(IMatchingCondition var1);
StoredMessage is protected cannot be accessed by others
Running org.eclipse.moquette.connection.ConnectionIT
[ERROR] Internal Error. Sending error to client
java.lang.RuntimeException: Script not found: org/eclipse/moquette/connection/connect.then.close.rpt
at org.kaazing.robot.driver.control.handler.ControlServerHandler.prepareReceived(ControlServerHandler.java:148)
at org.kaazing.robot.driver.control.handler.ControlUpstreamHandler.messageReceived(ControlUpstreamHandler.java:40)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:536)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.messageReceived(ReplayingDecoder.java:435)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
[ERROR] Internal Error. Sending error to client
java.lang.RuntimeException: Script not found: org/eclipse/moquette/connection/connect.with.invalid.WillQoS.rpt
at org.kaazing.robot.driver.control.handler.ControlServerHandler.prepareReceived(ControlServerHandler.java:148)
at org.kaazing.robot.driver.control.handler.ControlUpstreamHandler.messageReceived(ControlUpstreamHandler.java:40)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:536)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.messageReceived(ReplayingDecoder.java:435)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
Tests run: 3, Failures: 0, Errors: 2, Skipped: 1, Time elapsed: 0.212 sec <<< FAILURE! - in org.eclipse.moquette.connection.ConnectionIT
shouldConnectThenClose(org.eclipse.moquette.connection.ConnectionIT) Time elapsed: 0.127 sec <<< ERROR!
org.kaazing.robot.junit.RoboticException: Internal Error:Script not found: org/eclipse/moquette/connection/connect.then.close.rpt
at org.kaazing.robot.junit.rules.ScriptRunner.call(ScriptRunner.java:126)
at org.kaazing.robot.junit.rules.ScriptRunner.call(ScriptRunner.java:42)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
at java.util.concurrent.FutureTask.run(FutureTask.java:166)
at java.lang.Thread.run(Thread.java:722)
connectWithInvalidWillQoS(org.eclipse.moquette.connection.ConnectionIT) Time elapsed: 0.008 sec <<< ERROR!
org.kaazing.robot.junit.RoboticException: Internal Error:Script not found: org/eclipse/moquette/connection/connect.with.invalid.WillQoS.rpt
at org.kaazing.robot.junit.rules.ScriptRunner.call(ScriptRunner.java:126)
at org.kaazing.robot.junit.rules.ScriptRunner.call(ScriptRunner.java:42)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
at java.util.concurrent.FutureTask.run(FutureTask.java:166)
at java.lang.Thread.run(Thread.java:722)
Results :
Tests in error:
ConnectionIT.shouldConnectThenClose » Robotic Internal Error:Script not found:...
ConnectionIT.connectWithInvalidWillQoS » Robotic Internal Error:Script not fou...
Tests run: 3, Failures: 0, Errors: 2, Skipped: 1
[WARNING] File encoding has not been set, using platform encoding UTF-8, i.e. build is platform dependent!
[INFO]
[INFO] --- robot-maven-plugin:0.0.0.12:stop (default) @ moquette-broker ---
[INFO]
[INFO] --- maven-failsafe-plugin:2.17:verify (default) @ moquette-broker ---
[INFO] Failsafe report directory: /home/andrea/workspace/moquette_github/broker/target/failsafe-reports
[WARNING] File encoding has not been set, using platform encoding UTF-8, i.e. build is platform dependent!
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary:
[INFO]
[INFO] Moquette MQTT parent .............................. SUCCESS [0.287s]
[INFO] Moquette - Parser Commons ......................... SUCCESS [1.669s]
[INFO] Moquette - Netty Parser ........................... SUCCESS [1.749s]
[INFO] Moquette - broker ................................. FAILURE [1:15.691s]
[INFO] Moquette - distribution ........................... SKIPPED
[INFO] Moquette - OSGi bundle ............................ SKIPPED
I am getting this error when running mvn clean package and mvn clean install
Note: I deleted the moquette_store.mapdb and .wal files is there somthing else i was supposed to drop ??
u mentioned in defect 19 - Hi these types of errors are flapping, due to the fact (I'll fix it) that the file ~/moquette.mapdb* are not removed. It should solve dropping that file
I am getting this error when running mvn install pax:provision
[ERROR] Failed to execute goal on project moquette-bundle: Could not resolve dep
endencies for project org.eclipse.moquette:moquette-bundle:bundle:0.7-SNAPSHOT:
Could not find artifact org.eclipse.moquette:moquette-broker:jar:0.7-SNAPSHOT in
Paho Releases (https://repo.eclipse.org/content/repositories/paho-releases/) ->
[Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e swit
ch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please rea
d the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/DependencyReso
lutionException
'cmd' is not recognized as an internal or external command,
operable program or batch file.
Dear Andrea,
We need to extend Moquette for customizing authentication and autherization functionalities. The code includes two authentication scheme but it does not allow to plug ours in by configuration. Additionaly, we'd like to catch 'subscribe' events, and on an authorization check the server should let the subscription request would be accepted or npt.
Thank you
It is possible to publish a file? If yes could you tell me how the file should be encoded?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.