Giter Club home page Giter Club logo

rabbitmq-jms-client's Introduction

Java JMS Client for RabbitMQ

Maven Central Build Status (2.x) Build Status (3.x)

Overview

This is a JMS client library for RabbitMQ, working in concert with rabbitmq-jms-topic-exchange, a RabbitMQ server plugin. It supports JMS 2.0 as of 2.7.0 and JMS 3.0 as of 3.0.0. Both 2.x and 3.x branches are maintained and supported.

RabbitMQ JMS Client 2.x requires Java 8 or more. RabbitMQ JMS Client 3.x requires Java 11 or more.

JMS 1.1 and 2.0

Building from Source

This project is managed by Maven, so use

./mvnw clean install -Dmaven.test.skip=true

to build it from source and install into the local repository.

Running Tests

See CONTRIBUTING.md for an overview of the development process.

Unit Tests

./mvnw clean test

Integration Tests

Running Integration Tests with Docker

Launch the broker:

docker run -it --rm --name rabbitmq -p 5672:5672 rabbitmq

Enable the JMS Topic Exchange plugin:

docker exec rabbitmq rabbitmq-plugins enable rabbitmq_jms_topic_exchange

Launch the tests:

./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq

Running Integration Tests with a Local Broker

To launch the test suite (requires a local RabbitMQ node with JMS Topic Exchange plugin enabled):

./mvnw verify -Drabbitmqctl.bin=/path/to/rabbitmqctl

JMS 1.1 Compliance Test Suite

JMS 1.1 compliance test suite for this client is available in a separate repository.

Versioning

This library uses semantic versioning.

Support

See the RabbitMQ Java libraries support page for the support timeline of this library.

(c) 2007-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.

This package, the RabbitMQ JMS client library, is double-licensed under the Apache License version 2 ("ASL") and the Mozilla Public License 2.0 ("MPL").

See LICENSE.

rabbitmq-jms-client's People

Contributors

acogoluegnes avatar alehane avatar alexshvedoff avatar amandeep1696 avatar chbriem avatar chiragsanghavi avatar dcorbacho avatar dependabot[bot] avatar dumbbell avatar fhanik avatar gmornet avatar gregturn avatar jjank avatar marcialrosales avatar meaghek avatar michaelklishin avatar mkuratczyk avatar rabbitmq-ci avatar remibantos avatar rmorgan avatar sdeeg-pivotal avatar sdurrenmatt avatar spring-operator avatar turcsanyip avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rabbitmq-jms-client's Issues

Content_type needs to be configurable

The content_type in RMQMessageProducer file is hardcoded to application/octet-stream for text messages. So even when the application supplies a content_type, this configuration overrides the one supplied. Is there a way to make it configurable like using the content_type from the header if its already set else set it?

Reported on rabbitmq-users

RMQObjectFactory does not work with Wildfly object-factory JNDI binding

The RMQObjectFactory class does not work with Wildfly object-factory JNDI binding configuration, documented here
This is because the getObjectInstance() Object first argument is handled as a javax.naming.Reference instance whereas a java.lang.String object is actually provided to this method by Wildfly when using the object-factory binding configuration.
Thus, the following NamingException is always thrown by this code bellow:

Reference ref = obj instanceof Reference ? (Reference) obj : null;

        if (ref == null && (environment == null || environment.isEmpty())) {
            throw new NamingException("Unable to instantiate object: obj is not a Reference instance and environment table is empty");
        }

From what I understand from javax.naming.ObjectFactory javadoc, there is not requirement to deal with this parameter as a javax.naming.Reference instance only.
So in order to make the RMQObjectFactory work with Wildfly object-factory, we could add the possibility to provide all required properties via the Hashtable environment third argument of getObjectInstance() method.

Note that I'm going to propose a PR for that.

Using receive(long timeout) to await the reply for RPC call limits throughput to 10/second

To reproduce:
Unpack and build the attached archive.
On one command-line start the server:
java -cp example-1.0-shaded.jar JMSPassiveRpcServerTest
On a second command-line run the client:
java -cp example-1.0-shaded.jar JMSRpcClientTest
Expected result : throughput is at least 100s/second, especially if the ITER constant is made larger than 100.
Actual result : throughput is less than 10/second.

Note that a native RabbitMQ RPC client/server test is also included in this archive (classes RpcClientTest and RpcServerTest), which achieves good performance.

My analysis:
The MessageConsumer.receive(long timeout) method is implemented using DelayedReceiver. When DelayedReceiver.get() finds that no message is available, it sleeps for 100ms before trying again. Since it is almost always the case that the server has not replied before receive() is called by an RPC client, every call tends to incur this penalty.

Suggested solution:
Implement MessageConsumer.receive() using a background receiver to accept messages and a thread-safe queue to exchange messages between background and foreground, similar to the implementation of the RabbitMQ RPCClient class. Note that this may induce some complexity in the implementation, or make it impossible for a client to both call receive() AND use a MessageListener on the same Connection (however, I think this is already disallowed -- you cannot both have a MessageListener and call receive())..
rabbitmq-rpc-example.zip

Requeue message on unhandled client exception

In a MessageListener's onMessage(Message) method, if a runtime exception is thrown I was expecting the message to be requeued and redelivered. I am not seeing that happen.

Here is the setup:

  • Two consumers:
    1. QoS (prefetch) is set to 1
    2. Session is AUTO_ACKNOWLEDGE and NOT transacted
    3. First consumer throws runtime exception in onMessage()
    4. Second consumer processes the message
  • One Producer:
    1. Sends 10 messages

Results:

  • exception is thrown
  • consumer 1 fails and does not recover
  • messages alternate consumers, so messages 2,4,6,8 and 10 are processed
  • message 1 is delivered to consumer 1, but is never redelivered
  • messages 3,5,7,9 are then sent to the alternate consumer.

In section 4.5.2 of the JMS 1.1 spec, it states that, although it is a client error to throw in onMessage, if it does happen, messages should be re-queued and redelivered. I briefly checked out the JMS Compliant Test suite you're using, and it doesn't appear this type of test is covered.

Add pluggable conversion strategy on message consumption

Right now the conversion from the native AMQP message to a JMS message is hardcoded, which is not great for interoperability. JMS "AMQP" destinations provide some interoperability, but are limited to JMS queues. So there's currently no way that some JMS subscribers can consume messages published by non-JMS clients.

This could be implemented as chain of responsibility, the current behavior being the default and usually last part when custom steps are added. An application developer should typically only focus on the payload, so helpers e.g. to deal with headers conversion should be provided.

Original mailing-list thread: https://groups.google.com/d/msg/rabbitmq-users/Zbcp9WL-vUM/X_M_UuurAgAJ

Cannot publish to specific rmq queue

Hi,

I'm trying to run jms-client from spring environment towards RabbitMQ server;

I do have it configured like that (just follwing example from here https://www.rabbitmq.com/jms-client.html )

  <bean id="jmsConnectionFactory" class="com.rabbitmq.jms.admin.RMQConnectionFactory">
    <property name="username" value="user" />
    <property name="password" value="test" />
    <property name="virtualHost" value="lbb" />
    <property name="host" value="localhost" />
    <property name="port" value="45671" />
    <property name="ssl" value="true" />
  </bean>

  <bean id="jmsDestination" class="com.rabbitmq.jms.admin.RMQDestination" >
    <property name="destinationName" value="site666" />
    <property name="amqp"            value="true" />
    <property name="amqpQueueName"   value="siteId-8" />
  </bean>

thing is that it dont wont to push the message throwing

3:30:01,054 DEBUG RMQConnectionFactory| Connection RMQConnection{rabbitConnection=amqp://[email protected]:45671/lbb, stopped=true, queueBrowserReadMax=0} created.
13:30:01,070 ERROR t.RMQMessageProducer| Cannot write to AMQP destination RMQDestination{destinationName='site666', queue(permanent, amqp)', amqpExchangeName='null', amqpRoutingKey='null', amqpQueueName='siteId-8'}
13:30:01,080 INFO         citrus.Citrus| 
13:30:01,080 ERROR        citrus.Citrus| TEST FAILED RabbitMQ.jmsTry <com.bbb.lbb.tests.impl.behavioral> Nested exception is: 
org.springframework.jms.UncategorizedJmsException: Uncategorized exception occurred during JMS processing; nested exception is com.rabbitmq.jms.util.RMQJMSException: Cannot write to AMQP destination
	at org.springframework.jms.support.JmsUtils.convertJmsAccessException(JmsUtils.java:316)
	at org.springframework.jms.support.JmsAccessor.convertJmsAccessException(JmsAccessor.java:169)
	at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:487)
	at org.springframework.jms.core.JmsTemplate.send(JmsTemplate.java:559)
	at org.springframework.jms.core.JmsTemplate.send(JmsTemplate.java:550)

I did tried setting exchange name to amq.default (but logs says it cannot find that in that vhost, also I put exchange as amq.direct, didnt help) Also supplying the routing key does not help much.

Used queues siteId-8 and site666 are previosly declared and durable and visible from vhost user.
(to pre precise they are normally used via amqp)

Looking for suggestions. I just want to push simple message towards named queue.

BR,
Gregory

Provide a way to configure onMessageTimeoutMs

The time allowed for message processing in MessageListener.onMessage(Message) is set to 2 seconds in RMQSession with constant ON_MESSAGE_EXECUTOR_TIMEOUT_MS.

For some needs I require a much longer delay while the message is beeing processed on a slow backend. This works well with the Java AMQP client (that doesn't have a time limit on onMessage), but not with rabbitmq-jms-client.

Therefore, I would like to have onMessageTimeoutMs configurable as a JNDI property, as currently done with queueBrowserReadMax.

CreateProcess error=2, The system cannot find the file specified

Hi,

When I build the project, I'm getting error as "Failed to execute goal org.codehaus.gmaven:groovy-maven-plugin:2.0:execute (query-test-tls-certs-dir) on project rabbitmq-jms: Execution query-test-tls-certs-dir of goal org.codehaus.gmaven:groovy-maven-plugin:2.0:execute failed: java.io.IOException: Cannot run program "make": CreateProcess error=2, The system cannot find the file specified -> [Help 1]
org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute goal org.codehaus.gmaven:groovy-maven-plugin:2.0:execute (query-test-tls-certs-dir) on project rabbitmq-jms: Execution query-test-tls-certs-dir of goal org.codehaus.gmaven:groovy-maven-plugin:2.0:execute failed: java.io.IOException: Cannot run program "make": CreateProcess error=2, The system cannot find the file specified". Attached the raw log for your reference. Kindly help me with this error. Thank you.

Regrads
Nandha
JMSClientBuildFail.txt

Trying to send an empty message results in an error

Sending an empty message results in this error:

org.springframework.jms.MessageEOFException: Cannot read all of non-RMQ Message body.; nested exception is javax.jms.MessageEOFException: Cannot read all of non-RMQ Message body.
	at org.springframework.jms.support.JmsUtils.convertJmsAccessException(JmsUtils.java:294)
	at org.springframework.jms.support.JmsAccessor.convertJmsAccessException(JmsAccessor.java:169)
	at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:487)
	at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.send(JmsConfiguration.java:484)
	at org.apache.camel.component.jms.JmsProducer.doSend(JmsProducer.java:440)
	at org.apache.camel.component.jms.JmsProducer.processInOnly(JmsProducer.java:394)
	at org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:157)
	at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:145)
	at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:77)
	at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:541)
	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198)
	at org.apache.camel.processor.Pipeline.process(Pipeline.java:120)
	at org.apache.camel.processor.Pipeline.process(Pipeline.java:83)
	at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:198)
	at org.apache.camel.component.netty4.handlers.ServerChannelHandler.processAsynchronously(ServerChannelHandler.java:138)
	at org.apache.camel.component.netty4.handlers.ServerChannelHandler.channelRead0(ServerChannelHandler.java:109)
	at org.apache.camel.component.netty4.http.handlers.HttpServerChannelHandler.channelRead0(HttpServerChannelHandler.java:213)
	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
	at org.apache.camel.component.netty4.http.handlers.HttpServerMultiplexChannelHandler.channelRead0(HttpServerMultiplexChannelHandler.java:113)
	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.access$600(AbstractChannelHandlerContext.java:38)
	at io.netty.channel.AbstractChannelHandlerContext$7.run(AbstractChannelHandlerContext.java:353)
	at io.netty.util.concurrent.DefaultEventExecutor.run(DefaultEventExecutor.java:66)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
	at java.lang.Thread.run(Thread.java:745)
Caused by: javax.jms.MessageEOFException: Cannot read all of non-RMQ Message body.
	at com.rabbitmq.jms.client.message.RMQBytesMessage.recreate(RMQBytesMessage.java:561)
	at com.rabbitmq.jms.client.RMQMessage.normalise(RMQMessage.java:1356)
	at com.rabbitmq.jms.client.RMQMessageProducer.internalSend(RMQMessageProducer.java:234)
	at com.rabbitmq.jms.client.RMQMessageProducer.send(RMQMessageProducer.java:189)
	at org.springframework.jms.core.JmsTemplate.doSend(JmsTemplate.java:626)
	at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.doSend(JmsConfiguration.java:592)
	at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.doSendToDestination(JmsConfiguration.java:531)
	at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.access$100(JmsConfiguration.java:473)
	at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate$1.doInJms(JmsConfiguration.java:487)
	at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:484)
	... 23 more

It seems that the problem is in RMQBytesMessage#recreate(BytesMessage), line if (bodySize != msg.readBytes(byteArray, bodySize)). If the stream is empty, readBytes returns -1, which corresponds to bodySize being 0. However with the current straightforward comparison -1 and 0 are not equal.

Revisit serialisation versioning of key classes

See #109 for background. We only have a hypothesis there but it's a good problem to be proactive about: given that JNDI can serialise key classes from this client, we should revisit how they are versioned for [de]serialisation.

JmsMessagingTemplate.convertSendAndReceive fails re-declaring temp queue

Using springboot 1.5.11.RELEASE, org.springframework:spring-jms:4.3.15, com.rabbitmq.jms:rabbitmq-jms:1.8.1

@Autowired
JmsMessagingTemplate t;

@GetMapping("/test")
String test() {
	String resp = t.convertSendAndReceive("tq", "request", String.class);
	return resp;
}

@JmsListener(destination = "tq")
String list(Message msg) {
	System.out.println("Received message" + msg);
	return "response";
}

Got below exception

Received messagecom.rabbitmq.jms.client.message.RMQTextMessage@35e0782
2018-04-09 13:42:10.604 ERROR 6340 --- [enerContainer-1] com.rabbitmq.jms.client.RMQSession : RabbitMQ exception on queue declare name(jms-temp-queue-dc5833f7-1870-4a20-b60e-cfc2b93c8dd7), durable(false), exclusive(true), auto-delete(false), properties(null)

java.io.IOException: null
	at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:124) ~[amqp-client-4.3.0.jar:4.3.0]
	at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:120) ~[amqp-client-4.3.0.jar:4.3.0]
	at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:142) ~[amqp-client-4.3.0.jar:4.3.0]
	at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:952) ~[amqp-client-4.3.0.jar:4.3.0]
	at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.queueDeclare(AutorecoveringChannel.java:333) ~[amqp-client-4.3.0.jar:4.3.0]
	at com.rabbitmq.jms.client.RMQSession.declareRMQQueue(RMQSession.java:838) [rabbitmq-jms-1.8.1.jar:1.8.1]
	at com.rabbitmq.jms.client.RMQSession.declareDestinationIfNecessary(RMQSession.java:615) [rabbitmq-jms-1.8.1.jar:1.8.1]
	at com.rabbitmq.jms.client.RMQSession.createProducer(RMQSession.java:606) [rabbitmq-jms-1.8.1.jar:1.8.1]
	at org.springframework.jms.listener.adapter.AbstractAdaptableMessageListener.sendResponse(AbstractAdaptableMessageListener.java:403) [spring-jms-4.3.15.RELEASE.jar:4.3.15.RELEASE]
	at org.springframework.jms.listener.adapter.AbstractAdaptableMessageListener.handleResult(AbstractAdaptableMessageListener.java:253) [spring-jms-4.3.15.RELEASE.jar:4.3.15.RELEASE]
	at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:71) [spring-jms-4.3.15.RELEASE.jar:4.3.15.RELEASE]
	at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:719) [spring-jms-4.3.15.RELEASE.jar:4.3.15.RELEASE]
	at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:679) [spring-jms-4.3.15.RELEASE.jar:4.3.15.RELEASE]
	at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:649) [spring-jms-4.3.15.RELEASE.jar:4.3.15.RELEASE]
	at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:317) [spring-jms-4.3.15.RELEASE.jar:4.3.15.RELEASE]
	at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:255) [spring-jms-4.3.15.RELEASE.jar:4.3.15.RELEASE]
	at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1168) [spring-jms-4.3.15.RELEASE.jar:4.3.15.RELEASE]
	at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1160) [spring-jms-4.3.15.RELEASE.jar:4.3.15.RELEASE]
	at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1057) [spring-jms-4.3.15.RELEASE.jar:4.3.15.RELEASE]
	at java.lang.Thread.run(Unknown Source) [na:1.8.0_162]
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'jms-temp-queue-dc5833f7-1870-4a20-b60e-cfc2b93c8dd7' in vhost '/', class-id=50, method-id=10)
	at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66) ~[amqp-client-4.3.0.jar:4.3.0]
	at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36) ~[amqp-client-4.3.0.jar:4.3.0]
	at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:443) ~[amqp-client-4.3.0.jar:4.3.0]
	at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:263) ~[amqp-client-4.3.0.jar:4.3.0]
	at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:136) ~[amqp-client-4.3.0.jar:4.3.0]
	... 17 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'jms-temp-queue-dc5833f7-1870-4a20-b60e-cfc2b93c8dd7' in vhost '/', class-id=50, method-id=10)
	at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:509) ~[amqp-client-4.3.0.jar:4.3.0]
	at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:340) ~[amqp-client-4.3.0.jar:4.3.0]
	at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:162) ~[amqp-client-4.3.0.jar:4.3.0]
	at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:109) ~[amqp-client-4.3.0.jar:4.3.0]
	at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:643) ~[amqp-client-4.3.0.jar:4.3.0]
	at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47) ~[amqp-client-4.3.0.jar:4.3.0]
	at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:581) ~[amqp-client-4.3.0.jar:4.3.0]
	... 1 common frames omitted

How do i configure dead letter exchange from JMS API

My requirement is to part of the job submission to the queue I want to create a dead letter exchange for that particular routing key. This can be done via overriding parameters using Spring AMQP library.

QueueBuilder.durable(Constants.INCOMING_QUEUE_NAME)
                .withArgument("x-dead-letter-exchange", "")
                .withArgument("x-dead-letter-routing-key", Constants.DEAD_LETTER_QUEUE_NAME)
                .build();

Following is the JMS client code snippet.

RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
	        connectionFactory.setUsername(userName);
	        connectionFactory.setPassword(password);
	        connectionFactory.setHost(hostName);
	      
	        RMQDestination jmsDestination = new RMQDestination();
	        jmsDestination.setAmqp(false);
	        jmsDestination.setQueue(true);
	        jmsDestination.setAmqpExchangeName(exchanger.name());
	        jmsDestination.setAmqpRoutingKey(queueName);
	        jmsDestination.setDestinationName(queueName);
	        jmsDestination.setAmqpQueueName(queueName);
	        
			try (Connection connection = connectionFactory.createConnection();Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
				
				connection.start();
				MessageProducer producer = session.createProducer(jmsDestination);
				TextMessage msg = session.createTextMessage(JsonWrapper.toJsonUsingJackson(message));
				msg.setJMSType("TextMessage");
producer.send(msg );
}

So is there any way to achive the same thing in RabbitMQ JMS client API?

Thanks.

RMQSession.createConsumer(Destination, messageSelector) throws UnsupportedOperationException without checking if destination is a topic

Hello,
In RMQSession class, the below method checks for empty message selector and throws an exception if found non-empty. This does not take into account whether the destination is Topic(where message selectors are supported) or Queue.

public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
illegalStateExceptionIfClosed();
        if (nullOrEmpty(messageSelector)) {
            return createConsumer(destination);
        } else {
            // we are not implementing this method yet
            throw new UnsupportedOperationException();
        }
    }

The context is I am using JmsTemplate.receiveSelected with a messageselector on a topic

Exception Resolving RMQConnectionFactory from WildFly 16.0.0 JNDI

Trying to resolve ConnectionFactory from remote JNDI on Wildfly (16.0.0-Final) results in an exception while unmarshalling.

The calling code is very simple:

    Properties environmentParameters = new Properties();
    environmentParameters.put(Context.INITIAL_CONTEXT_FACTORY, "org.wildfly.naming.client.WildFlyInitialContextFactory");
    environmentParameters.put(Context.PROVIDER_URL, "remote+http://mywildflyhost:8080");
    namingContext = new InitialContext(environmentParameters);
    ConnectionFactory connFactory = (ConnectionFactory) ctx.lookup("jms/ConnectionFactory");

the Connection Factory as defined in standalone-full.xml on my WildFly server:

                <object-factory name="java:jboss/exported/jms/ConnectionFactory" module="org.jboss.genericjms.provider" class="com.rabbitmq.jms.admin.RMQObjectFactory">
                    <environment>
                        <property name="className" value="javax.jms.ConnectionFactory"/>
                        <property name="uri" value="amqp://user:pass@:myrmqhost:5672"/>
                    </environment>
                </object-factory>

I defined the org.jboss.genericjms.provider module with the following module.xml

<module name="org.jboss.genericjms.provider" xmlns="urn:jboss:module:1.5">
    <resources>
       <resource-root path="."/>
        <resource-root path="amqp-client-5.7.0.jar"/>
        <resource-root path="rabbitmq-jms-1.11.2.jar"/>
	<resource-root path="slf4j-api-1.7.25.jar"/>
    </resources>
    <dependencies>
      <module name="javax.api"/> 
      <module name="javax.jms.api"/>
    </dependencies>
</module>

This is the exception I get (ActiveBatch is the calling code):

java.lang.reflect.InvocationTargetException
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at Asci.ActiveBatch.Runtime.Messaging.CommandHandler.invoke(Unknown Source)
	at Asci.ActiveBatch.Integration.HostDomainCommandService.Invoke(Unknown Source)
	at Asci.ActiveBatch.Integration.HostDomainCommandService.Invoke(Unknown Source)
	at Asci.ActiveBatch.Integration.IntegrationService.Invoke(Unknown Source)
Caused by: javax.naming.CommunicationException: WFNAM00020: Remote naming operation failed [Root exception is java.io.EOFException: Read past end of file]
	at org.wildfly.naming.client.remote.RemoteClientTransport.lookup(RemoteClientTransport.java:261)
	at org.wildfly.naming.client.remote.RemoteContext.lambda$lookupNative$0(RemoteContext.java:190)
	at org.wildfly.naming.client.NamingProvider.performExceptionAction(NamingProvider.java:222)
	at org.wildfly.naming.client.remote.RemoteContext.performWithRetry(RemoteContext.java:100)
	at org.wildfly.naming.client.remote.RemoteContext.lookupNative(RemoteContext.java:188)
	at org.wildfly.naming.client.AbstractFederatingContext.lookup(AbstractFederatingContext.java:74)
	at org.wildfly.naming.client.store.RelativeFederatingContext.lookupNative(RelativeFederatingContext.java:58)
	at org.wildfly.naming.client.AbstractFederatingContext.lookup(AbstractFederatingContext.java:74)
	at org.wildfly.naming.client.AbstractFederatingContext.lookup(AbstractFederatingContext.java:60)
	at org.wildfly.naming.client.AbstractFederatingContext.lookup(AbstractFederatingContext.java:66)
	at org.wildfly.naming.client.WildFlyRootContext.lookup(WildFlyRootContext.java:144)
	at javax.naming.InitialContext.lookup(InitialContext.java:417)
	at Asci.ActiveBatch.JMSAgent.JMSConsumer.receiveMsg(Unknown Source)
	at Asci.ActiveBatch.JMSAgent.JMSManager.receiveMsg(Unknown Source)
	at Asci.ActiveBatch.JMSAgent.JMSService.ReceiveMessageHandler(Unknown Source)
	... 8 more
Caused by: java.io.EOFException: Read past end of file
	at org.jboss.marshalling.SimpleDataInput.eofOnRead(SimpleDataInput.java:151)
	at org.jboss.marshalling.SimpleDataInput.readUnsignedByteDirect(SimpleDataInput.java:294)
	at org.jboss.marshalling.SimpleDataInput.readUnsignedByte(SimpleDataInput.java:249)
	at org.jboss.marshalling.river.RiverUnmarshaller.doReadObject(RiverUnmarshaller.java:220)
	at org.jboss.marshalling.river.RiverUnmarshaller.readFields(RiverUnmarshaller.java:1853)
	at org.jboss.marshalling.river.RiverUnmarshaller.doInitSerializable(RiverUnmarshaller.java:1767)
	at org.jboss.marshalling.river.RiverUnmarshaller.doReadNewObject(RiverUnmarshaller.java:1395)
	at org.jboss.marshalling.river.RiverUnmarshaller.doReadObject(RiverUnmarshaller.java:272)
	at org.jboss.marshalling.river.RiverUnmarshaller.doReadObject(RiverUnmarshaller.java:205)
	at org.jboss.marshalling.AbstractObjectInput.readObject(AbstractObjectInput.java:41)
	at org.wildfly.naming.client.remote.RemoteClientTransport.lookup(RemoteClientTransport.java:243)
	... 22 more
Caused by: an exception which occurred:
	in field com.rabbitmq.jms.admin.RMQConnectionFactory.amqpConnectionFactoryPostProcessor
	in object com.rabbitmq.jms.admin.RMQConnectionFactory@6293abcc
	in object of type com.rabbitmq.jms.admin.RMQConnectionFactory

This seems to be caused by a problem with unmarshalling the Consumer field amqpConnectionFactoryPostProcessor added with this commit
da91f5d
I can verify that using rabbitmq-jms-client-1.9.0 I do not receive this exception.
An example configuration for WildFly is shown on https://www.rabbitmq.com/jms-client.html so even if this is actually a bug in WildFly code, I am of the opinion that the RabbitMQ JMS Client should work around that (especially being that the JMS Client worked seamlessly with WildFly prior to 1.10.0)

Support for metrics collection

Currently there is no way to configure the connection factory to turn metrics collection on, even though it is a feature that has been supported by the underlying rabbitmq/amqp connection factory.

Moreover, it would be preferable if the configuration can be done declaratively to facilitate setting them up as resources in JNDI. Presumably, some sort of dynamic class loading is needed for this to work.

SSLContext is overridden with default

Hi,

We plan to use the JMS connection in TIBCO BW 5.x. In BW, we define the file JNDI and use the "com.sun.jndi.fscontext.RefFSContextFactory" as the InitialContext class to lookup the JMS connection factory. Due to the limitation of TIBCO BW, it does not allow us to useSslProtocol() to configure the sslContext manully. Is it possible to introduce some properties in the JMS connetion facory for SSL, such keystore, trustkeystore and passwords and let connection facotry configure the sslConext by itself? If so, we can configure those SSL properties in the .bindings file there.

One more thing on the method createConnection(String username, String password) of class
com.rabbitmq.jms.admin.RMQConnectionFactory. In the test with java code, I found that the sslContext will be overwriten by the default sslContext even the sslContext has been configured by the maybeEnableTLS(). The reason is the following setRabbitUri() will reset sslContext again. Is it possible to change the order of the those two methods?

    maybeEnableTLS(factory);
    setRabbitUri(logger, this, factory, this.getUri());

Best Regards,
Luke

Message JMS properties are largely ignored

I am using the 1.6 client and I noticed that setting any message attribute except JMSCorrelationId and JMSType are ignored in favor of the message producer settings. My expectation, and experience with other JMS clients (e.g. ActiveMQ), is that when I set Message properties they take precedence, unless a producer.send overload was used (and only for the properties represented in the overload).

Is there a reason I am overlooking that this doesn't occur with the RMQ JMS client, or is this a defect?

Also, there is a comment in the RMQMessage.copyAttributes that says JMSReplyTo is ignored completely. Same question, is there a reason or is this a defect?

Thanks for looking into this.

Need a way to set consumer prefetch

After using this library, I am facing the limitation of prefetching. I used to do that for amqp library by using basicQos in channel level.
Basically, I think without this feature, current throughput could be not improved in my consumer side.

Uncatchable exception thrown when connecting with an invalid RMQDestination queue.

Hi, I've ran into an issue where when I attempt to set up a message listener on an RMQDestination with an invalid destname the errors can not be caught and instead get thrown I am assuming from SLF4J. The core of the issue is that I need a way to see that an error has occurred on that connection so that I can identify when an invalid queue has been entered. Currently that error will get posted, but to our back end logging, where it will post it multiple times ( due to the persistent nature of AMQP ) and is uncatchable within the code.

I've tried forcing other errors and everything else is caught before it sets up the message listener or after, an invalid destination name seems to be the only cause of making an uncatchable error.

This is the exact error I am hitting.

https://github.com/rabbitmq/rabbitmq-jms-client/blob/master/src/main/java/com/rabbitmq/jms/client/RMQMessageConsumer.java

GetResponse getFromRabbitQueue() {
    String qN = rmqQueueName();
    try {
        return getSession().getChannel().basicGet(qN, false);
    } catch (Exception e) { // includes unchecked exceptions, e.g. ShutdownSignalException
        if (!(e instanceof ShutdownSignalException) && !(e.getCause() instanceof ShutdownSignalException)) {
            logger.error("basicGet for queue '{}' threw unexpected exception", qN, e);
        }
    }

return null;

More tests required for the RMQDestination use cases

Tests present in this repository are a great source of examples for real world usage.
My case is following:
write Textmessage using JMS Queue API (using RMQDestination()) -> Amqp Topic Exchange -> Amqp Queue -> read Textmessage using JMS Queue API (using RMQDestination())

What I've seen is just very simple tests in SimpleAmqpQueueMessageIT:

  • write Textmessage using JMS Queue API (using RMQDestination()) -> Amqp Queue -> read using Amqp channel
  • write Textmessage via Amqp channel -> Amqp Queue -> using JMS Queue API (using RMQDestination())

Is there any chance to cover also the case I mentioned? More people could benefit from it I believe and this could also cover more use-cases in the tests.

Probably providing also a bit simpler test would make sense:

  • write Textmessage using JMS Queue API (using RMQDestination()) -> Amqp Queue -> read Textmessage using JMS Queue API (using RMQDestination())

Setting SSLContext still results in TrustEverythingTrustManager warning

When creating a connection using the RMQConnectionFactory, if you have passed your own SSLContext you will still get the warning:

14:50:54.068 [main] WARN com.rabbitmq.client.TrustEverythingTrustManager - SECURITY ALERT: this trust manager trusts every certificate, effectively disabling peer verification. This is convenient for local development but offers no protection against man-in-the-middle attacks. Please see https://www.rabbitmq.com/ssl.html to learn more about peer certificate verification.

Which is not accurate and was initially throwing me off as I was expecting it to either use:

  1. My own TrustManager specified via config
  2. The systems default TrustManager

And when I ended up digging into it, I did find it was actually working properly contrary to the warning message that was being logged. (Which was rather annoying because I wasn't sure if it was my own trust manager, the system or the TrustEverythingTrustManager that was making it work)

It looks like this is because when you create a connection you call:

setRabbitUri(logger, this, cf, getUri());

Which then calls:

com.rabbitmq.client.ConnectionFactory#setUri(String uriString)

And due to support the amQPs scheme and then Don't blindly overwrite SSLContext when specifying URI it will initialize the default TrustEverythingTrustManager, but then we override that when you subsequently call:

maybeEnableTLS(cf);

So, I believe this warning can be cleaned up by swapping those two lines (As the sslContext will properly be set by then), and then potentially adding a log / comment to:

RMQConnectionFactory#setUri(String uriString)

Indicating that if they can ignore the warnings generated from this as to not cause confusion as this can cause it to print out twice if your Uri starts with amqps.

I mean ideally, it would be great if the rabbitmq-java-client didn't by default set that TrustEverythingTrustManager but thats another story...

Do not declare reply-to destination for consumed messages

Reply-to destinations are typically used for RPC and the client is supposed to start listening on a reply-to destination before sending the request to the server. So once the server receives the request message, the reply-to already exists, so it's not necessary to try to create it when sending the response to it.

This seems a reasonable move, that would fix #47 in a broader way than the introduced fix (which doesn't work when the developer doesn't have much control on the server, e.g. when using a Spring-based listener).

JMeter JMS Publisher usage

I'm trying to do some perf testing with JMeter JMS.
On the server I have rabbitmq_jms_topic_exchange plugin enabled.

What is the proper JMeter Initial Context Factory to be set?
Tried with following:
image

However ended up in (expected) exception:

javax.naming.NamingException: javax.naming.NoInitialContextException: Cannot instantiate class: com.rabbitmq.jms.admin.RMQConnectionFactory [Root exception is java.lang.ClassCastException: com.rabbitmq.jms.admin.RMQConnectionFactory cannot be cast to javax.naming.spi.InitialContextFactory]
	at org.apache.jmeter.protocol.jms.client.InitialContextFactory.lookupContext(InitialContextFactory.java:69) ~[ApacheJMeter_jms.jar:4.0 r1823414]
	at org.apache.jmeter.protocol.jms.client.InitialContextFactory.getContext(InitialContextFactory.java:156) ~[ApacheJMeter_jms.jar:4.0 r1823414]
	at org.apache.jmeter.protocol.jms.client.ReceiveSubscriber.<init>(ReceiveSubscriber.java:231) ~[ApacheJMeter_jms.jar:4.0 r1823414]
	at org.apache.jmeter.protocol.jms.client.ReceiveSubscriber.<init>(ReceiveSubscriber.java:170) ~[ApacheJMeter_jms.jar:4.0 r1823414]
	at org.apache.jmeter.protocol.jms.sampler.SubscriberSampler.initListenerClient(SubscriberSampler.java:119) ~[ApacheJMeter_jms.jar:4.0 r1823414]
	at org.apache.jmeter.protocol.jms.sampler.SubscriberSampler.initClient(SubscriberSampler.java:346) [ApacheJMeter_jms.jar:4.0 r1823414]
	at org.apache.jmeter.protocol.jms.sampler.SubscriberSampler.threadStarted(SubscriberSampler.java:336) [ApacheJMeter_jms.jar:4.0 r1823414]
	at org.apache.jmeter.threads.JMeterThread$ThreadListenerTraverser.addNode(JMeterThread.java:676) [ApacheJMeter_core.jar:4.0 r1823414]
	at org.apache.jorphan.collections.HashTree.traverseInto(HashTree.java:994) [jorphan.jar:4.0 r1823414]
	at org.apache.jorphan.collections.HashTree.traverse(HashTree.java:977) [jorphan.jar:4.0 r1823414]
	at org.apache.jmeter.threads.JMeterThread.threadStarted(JMeterThread.java:644) [ApacheJMeter_core.jar:4.0 r1823414]
	at org.apache.jmeter.threads.JMeterThread.initRun(JMeterThread.java:632) [ApacheJMeter_core.jar:4.0 r1823414]
	at org.apache.jmeter.threads.JMeterThread.run(JMeterThread.java:246) [ApacheJMeter_core.jar:4.0 r1823414]
	at java.lang.Thread.run(Thread.java:745) [?:1.8.0_121]

I don't see any InitialContextFactory implementation in the sources out here. Or is jmeter JMS sampler not usable at all here?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.