Comments (7)
To implement both mandatory and immediate, shouldn't we add the notion of "ReturnListener" (as a ReturnStream or something like that) ? Or maybe it is another separate feature?
My thoughts are: using mandatory or immediate without checking the return is useless. What do you think about that?
from fs2-rabbit.
@cdelmas I'm not sure I follow. Both basicPublish
methods return void
in the Java driver. What do you mean with "checking the return"?
from fs2-rabbit.
When using the mandatory and/or immediate flags, you expect the server to return the message if it was not routed. For that, the Java SDK Channel
allows to add a ReturnListener
, in order to get back the message and do something adequate.
from fs2-rabbit.
Oh I see now, not sure how I'd go about it, I'll need to look into the API. With the Java driver one needs to guess what to expect...
Going to sleep soon but feel free to propose something. I'll try to give it a read to the API these days.
from fs2-rabbit.
@cdelmas just to let you know I'm working on this. Modifying the publishing algebra for now and registering the listeners on the channel
seems to work. Still need to add tests but I believe a PR will be coming soon.
I found the interface you were talking about:
/**
* Implement this interface in order to be notified of failed
* deliveries when basicPublish is called with "mandatory" or
* "immediate" flags set.
* For a lambda-oriented syntax, use {@link ReturnCallback}.
* @see Channel#basicPublish
*/
public interface ReturnListener {
void handleReturn(int replyCode,
String replyText,
String exchange,
String routingKey,
AMQP.BasicProperties properties,
byte[] body)
throws IOException;
}
from fs2-rabbit.
More information that I'd like to add to the docs:
bit mandatory
This flag tells the server how to react if the message cannot be routed to a queue. If this flag is set, the server will return an unroutable message with a Return method. If this flag is zero, the server silently drops the message.
The server SHOULD implement the mandatory flag.
bit immediate
This flag tells the server how to react if the message cannot be routed to a queue consumer immediately. If this flag is set, the server will return an undeliverable message with a Return method. If this flag is zero, the server will queue the message, but with no guarantee that it will ever be consumed.
The server SHOULD implement the immediate flag.
Source: https://www.rabbitmq.com/amqp-0-9-1-reference.html
from fs2-rabbit.
Playing around with setting the bit immediate
and noticed that it is not implemented by RabbitMQ 3.x
as specified in the release notes and in this official announcement.
If it's set to true
I get this error:
20:19:42.633 [scala-execution-context-global-101] ERROR c.github.gvolpe.fs2rabbit.util.Log$ - connection is already closed due to connection error; protocol method: #method<connection.close>(reply-code=540, reply-text=NOT_IMPLEMENTED - immediate=true, class-id=60, method-id=40)
com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; protocol method: #method<connection.close>(reply-code=540, reply-text=NOT_IMPLEMENTED - immediate=true, class-id=60, method-id=40)
at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:253) ~[amqp-client-5.4.2.jar:5.4.2]
at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:422) ~[amqp-client-5.4.2.jar:5.4.2]
at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:704) ~[amqp-client-5.4.2.jar:5.4.2]
at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicPublish(AutorecoveringChannel.java:202) ~[amqp-client-5.4.2.jar:5.4.2]
at com.github.gvolpe.fs2rabbit.interpreter.AMQPClientStream.$anonfun$basicPublishWithFlags$1(AMQPClientStream.scala:108) ~[fs2-rabbit_2.12-1.0-RC2.jar:1.0-RC2]
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) ~[scala-library.jar:na]
at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:85) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
at cats.effect.internals.IORunLoop$RestartCallback.signal(IORunLoop.scala:336) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:357) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:303) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
at cats.effect.internals.IOShift$Tick.run(IOShift.scala:36) ~[cats-effect_2.12-1.0.0.jar:1.0.0]
at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402) ~[na:1.8.0_161]
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) ~[na:1.8.0_161]
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) ~[na:1.8.0_161]
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) ~[na:1.8.0_161]
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) ~[na:1.8.0_161]
from fs2-rabbit.
Related Issues (20)
- Microsite is not available HOT 2
- Allow configuring the ConnectionResource ThreadFactory HOT 2
- Unsafe creation of consumers HOT 2
- Looking for new maintainers! HOT 10
- Update to cats-effect 3.x ecosystem HOT 4
- Allow "multiple" ack flag to be reset per call HOT 2
- Durations in Fs2RabbitConfig HOT 1
- 'Config' part of guide website has a different example from what is on github HOT 1
- plugin support rabbitmq-delayed-message-exchange HOT 3
- Unexpected message order in consumer HOT 2
- Configurable shared executor service for com.rabbitmq.client.impl.ConsumerWorkService HOT 1
- Feature : Client-Provided Connection Name HOT 4
- Doc site examples are for cats-effect 2 HOT 2
- Invalid code example on the site (despite mdoc) HOT 1
- Stream stucks if the server closes the channel
- Expired gpg key HOT 6
- Allow "requeue" nack flag to be reset per call HOT 1
- Can't create a publisher to publish directly to a queue? HOT 2
- How is ResilientStream supposed to work in the context of RMQ? HOT 2
- Can't find release 5.1.1 in maven repositories HOT 20
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from fs2-rabbit.