Giter Club home page Giter Club logo

Comments (7)

cdelmas avatar cdelmas commented on May 29, 2024

To implement both mandatory and immediate, shouldn't we add the notion of "ReturnListener" (as a ReturnStream or something like that) ? Or maybe it is another separate feature?
My thoughts are: using mandatory or immediate without checking the return is useless. What do you think about that?

from fs2-rabbit.

gvolpe avatar gvolpe commented on May 29, 2024

@cdelmas I'm not sure I follow. Both basicPublish methods return void in the Java driver. What do you mean with "checking the return"?

from fs2-rabbit.

cdelmas avatar cdelmas commented on May 29, 2024

When using the mandatory and/or immediate flags, you expect the server to return the message if it was not routed. For that, the Java SDK Channel allows to add a ReturnListener, in order to get back the message and do something adequate.

from fs2-rabbit.

gvolpe avatar gvolpe commented on May 29, 2024

Oh I see now, not sure how I'd go about it, I'll need to look into the API. With the Java driver one needs to guess what to expect...

Going to sleep soon but feel free to propose something. I'll try to give it a read to the API these days.

from fs2-rabbit.

gvolpe avatar gvolpe commented on May 29, 2024

@cdelmas just to let you know I'm working on this. Modifying the publishing algebra for now and registering the listeners on the channel seems to work. Still need to add tests but I believe a PR will be coming soon.

I found the interface you were talking about:

/**
 * Implement this interface in order to be notified of failed
 * deliveries when basicPublish is called with "mandatory" or
 * "immediate" flags set.
 * For a lambda-oriented syntax, use {@link ReturnCallback}.
 * @see Channel#basicPublish
 */
public interface ReturnListener {
    void handleReturn(int replyCode,
            String replyText,
            String exchange,
            String routingKey,
            AMQP.BasicProperties properties,
            byte[] body)
        throws IOException;
}

from fs2-rabbit.

gvolpe avatar gvolpe commented on May 29, 2024

More information that I'd like to add to the docs:

bit mandatory

This flag tells the server how to react if the message cannot be routed to a queue. If this flag is set, the server will return an unroutable message with a Return method. If this flag is zero, the server silently drops the message.

The server SHOULD implement the mandatory flag.

bit immediate

This flag tells the server how to react if the message cannot be routed to a queue consumer immediately. If this flag is set, the server will return an undeliverable message with a Return method. If this flag is zero, the server will queue the message, but with no guarantee that it will ever be consumed.

The server SHOULD implement the immediate flag.

Source: https://www.rabbitmq.com/amqp-0-9-1-reference.html

from fs2-rabbit.

gvolpe avatar gvolpe commented on May 29, 2024

Playing around with setting the bit immediate and noticed that it is not implemented by RabbitMQ 3.x as specified in the release notes and in this official announcement.

If it's set to true I get this error:

20:19:42.633 [scala-execution-context-global-101] ERROR c.github.gvolpe.fs2rabbit.util.Log$ - connection is already closed due to connection error; protocol method: #method<connection.close>(reply-code=540, reply-text=NOT_IMPLEMENTED - immediate=true, class-id=60, method-id=40)
com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; protocol method: #method<connection.close>(reply-code=540, reply-text=NOT_IMPLEMENTED - immediate=true, class-id=60, method-id=40)
	at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:253) ~[amqp-client-5.4.2.jar:5.4.2]
	at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:422) ~[amqp-client-5.4.2.jar:5.4.2]
	at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:704) ~[amqp-client-5.4.2.jar:5.4.2]
	at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicPublish(AutorecoveringChannel.java:202) ~[amqp-client-5.4.2.jar:5.4.2]
	at com.github.gvolpe.fs2rabbit.interpreter.AMQPClientStream.$anonfun$basicPublishWithFlags$1(AMQPClientStream.scala:108) ~[fs2-rabbit_2.12-1.0-RC2.jar:1.0-RC2]
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) ~[scala-library.jar:na]
	at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:85) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
	at cats.effect.internals.IORunLoop$RestartCallback.signal(IORunLoop.scala:336) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
	at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:357) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
	at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:303) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
	at cats.effect.internals.IOShift$Tick.run(IOShift.scala:36) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
	at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402) ~[na:1.8.0_161]
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) ~[na:1.8.0_161]
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) ~[na:1.8.0_161]
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) ~[na:1.8.0_161]
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) ~[na:1.8.0_161]

from fs2-rabbit.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.