Giter Club home page Giter Club logo

fs2-rabbit's People

Contributors

agustafson avatar akreuzer avatar aywengo avatar bpholt avatar catostrophe avatar cdelmas avatar changlinli avatar codacy-badger avatar daenyth avatar dougc avatar felixmulder avatar gvolpe avatar insdami avatar irevive avatar jacemale avatar jbwheatley avatar ksonj avatar kubukoz avatar ldip avatar matejcerny avatar mergify[bot] avatar nikiforo avatar orium avatar poohsen avatar retriku avatar scala-steward avatar scalolli avatar simpadjo avatar szymonm avatar wookievx avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

fs2-rabbit's Issues

purity issue in Fs2RabbitConfig

Hi,

the Documentation suggests using PureConfig to load the configuration. However there doesn't seem to be a pure, referentially transparent way to create the SSLContext that is needed for that; all the static factory methods to create an SSLContext throw exceptions or return null. Users are thus likely to end up with code like

  implicit val sslContextReader: ConfigReader[SSLContext] =
    ConfigReader[String].map(SSLContext.getInstance)

which performs side effects in an uncontrolled manner. Would you agree that this is an issue? There's probably a number of ways to tackle this issue, but before discussing them I'd first like to get a consensus that in an environment where side effects are generally visible in the types this is not an ideal behaviour.

Release 1.0-RC5

Hi @gvolpe,

we need the ability to deal with binary messages in our program, which is easy with our recent changes (EnvelopeDecoder). Could you make an 1.0-RC5 release?

// edit: thinking about it, this isn't very urgent. We can publish a release on our internal Nexus, so no need to hurry.

Try to fix the CI build

There's a problem with the dependency of Apache Qpid Broker and Logback. I've tried a few different combinations but they all failed. However, with the current configuration runs locally without problems, don't know how to fix the problem exactly.

[info] Exception encountered when attempting to run a suite with class name: com.github.gvolpe.fs2rabbit.Fs2RabbitSpec *** ABORTED ***
[info]   java.lang.ClassCastException: org.slf4j.helpers.SubstituteLogger cannot be cast to ch.qos.logback.classic.Logger
[info]   at org.apache.qpid.server.Broker$1.run(Broker.java:147)
[info]   at java.security.AccessController.doPrivileged(Native Method)
[info]   at javax.security.auth.Subject.doAs(Subject.java:422)
[info]   at org.apache.qpid.server.Broker.startup(Broker.java:142)
[info]   at com.github.gvolpe.fs2rabbit.embedded.EmbeddedAmqpBroker$.start(EmbeddedAmqpBroker.scala:16)
[info]   at com.github.gvolpe.fs2rabbit.Fs2RabbitSpec.beforeAll(Fs2RabbitSpec.scala:19)
[info]   at org.scalatest.BeforeAndAfterAll$class.beforeAll(BeforeAndAfterAll.scala:187)
[info]   at com.github.gvolpe.fs2rabbit.Fs2RabbitSpec.beforeAll(Fs2RabbitSpec.scala:14)
[info]   at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:253)
[info]   at com.github.gvolpe.fs2rabbit.Fs2RabbitSpec.run(Fs2RabbitSpec.scala:14)

Consuming a message throws NullPointerException if AMQP.BasicProperties.getHeaders is null

The defaultConsumer in UnderlyingAmqpClient does not handle the case when properties.getHeaders in handleDelivery returns null.
In this case an exception like the following is thrown.

(ERROR) com.rabbitmq.client.impl.ForgivingExceptionHandler: Consumer com.github.gvolpe.fs2rabbit.UnderlyingAmqpClient$$anon$1@1a3324af (amq.ctag-Grspu7yRG6e2rQ8J-8NqnA) method handleDelivery for channel AMQChannel(amqp://guest@xxx:5672/,1) threw an exception for channel AMQChannel(amqp://guest@xxx:5672/,1) java.lang.NullPointerException
	at com.github.gvolpe.fs2rabbit.model$AmqpProperties$.from(model.scala:67)
	at com.github.gvolpe.fs2rabbit.UnderlyingAmqpClient$$anon$1.handleDelivery(Fs2Rabbit.scala:51)
	at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
	at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:100)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

To create a such a message I used amqp-publish in the following way amqp-publish -u amqp://guest:guest@xxx:5672 -e "" -r daQ
The AMQP.BasicProperties I got with this were as follows.

#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=1, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)

Implement exchangeUnbind(String destination, String source, String routingKey)

Bare in mind that fs2-rabbit is just a wrapper for the AMQP Java client. So, whenever adding a new method that is supported by the native client, the procedure will be more or less the same:

  1. Check the signatures in the java doc of the official java client:
    https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/Channel.html
  2. For this case, the return type is AMQP.Exchange.UnbindOk so in fs2-rabbit it will be Stream[F, Exchange.UnbindOk]
  3. Add the new method at the end of the Fs2Rabbit file.
  4. Add scala doc comments to the new method. Take a look at the existent methods but in general it could be more or less similar to the Java client docs. This is the one for this method: "Unbind an exchange from an exchange, with no extra arguments."
  5. Add a unit test in the file Fs2RabbitSpec: I can help you out with this part since it's not as trivial as adding the new method and sometimes it might be okay to just invoke the method on an existent test instead of creating a new one.

HINT: Look at the existent method bindQueue, this one will be very similar ;)

Thanks for contributing!

Consider making AmqpEnvelope generic

I noticed that the AmqpMessage class is generic, but the AmqpEnvelope class isn't. Is there a specific reason for that? Because it seems to me that making AmqpEnvelope generic would be quite useful, one often needs to pass around the payload together with the metadata (e. g. to Ack or Nack a message later).

Implement exchangeBind(String destination, String source, String routingKey)

Bare in mind that fs2-rabbit is just a wrapper for the AMQP Java client. So, whenever adding a new method that is supported by the native client, the procedure will be more or less the same:

  1. Check the signatures in the java doc of the official java client:
    https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/Channel.html
  2. For this case, the return type is AMQP.Exchange.BindOk so in fs2-rabbit it will be Stream[F, Exchange.BindOk]
  3. Add the new method at the end of the Fs2Rabbit file.
  4. Add scala doc comments to the new method. Take a look at the existent methods but in general it could be more or less similar to the Java client docs. This is the one for this method: "Bind an exchange to an exchange, with no extra arguments."
  5. Add a unit test in the file Fs2RabbitSpec: I can help you out with this part since it's not as trivial as adding the new method and sometimes it might be okay to just invoke the method on an existent test instead of creating a new one.

HINT: Look at the existent method bindQueue, this one will be very similar ;)

Thanks for contributing!

Add support for different types of basicQos

It's currently hardcoded in the library to 1 without wrapping the function in an effect (it could throw an exception).

This will involve adding support for different types of basicQos:

  • basicQos(int prefetchCount)
  • basicQos(int prefetchCount, boolean global)
  • basicQos(int prefetchSize, int prefetchCount, boolean global)

The existent methods to create a consumer should also accept a parameter of type:

case class BasicQosConfig(prefetchCount: Int, prefetchSize: Option[Int], global: Option[Boolean])

See docs: https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/Channel.html

Replace EmbeddedBroker for some monadic interpreter

The current unit tests implementation use a custom EmbeddedAmqpBroker that simulates being RabbitMQ. This solution is very buggy and leads to random test failures, reason why I want to get rid of it.

At the moment I don't have any solution but I'm currently experimenting with an interpreter that works over IndexedStateT[Stream[F, ?], SA, SB, A] instead of directly Stream[F, ?]. With this signature it's easier to reason about the program since one cannot bind a queue without having declared it first though the final user code becomes more cumbersome.

I'll leave this issue opened until I can come up with a better solution (or someone else).

Make it multi-module to have the examples separated

The idea is to have two modules:

  • core: the library itself
  • examples: move the existing IO demo from the library and create some more examples (See #3 and #4)
  • remove the logback dependency from the core

Finally, only deploy the core to Nexus.

Add support for different types of exchangeDeclare

Currently the library implements only one of this flavor, but there are more to add:

  • exchangeDeclare(String exchange, BuiltinExchangeType type) [IMPLEMENTED (*)]
  • exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable)
  • exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String,Object> arguments)
  • exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, Map<String,Object> arguments)
  • exchangeDeclareNoWait(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String,Object> arguments)
  • exchangeDeclarePassive(String name)

Check the signatures in the java doc of the official java client:
https://rabbitmq.github.io/rabbitmq-java-client/api/current/com/rabbitmq/client/Channel.html

(*) Fs2Rabbit#L164

Make the effect type generic

Right now the implementation is tied to cats.effect.IO. Make all the public methods generic using cats.effect.Effect[F[_]].

Add support for different types of basicPublish

Implement these methods:

  • basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body) [IMPLEMENTED]
  • basicPublish(String exchange, String routingKey, boolean mandatory, AMQP.BasicProperties props, byte[] body)
  • basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, AMQP.BasicProperties props, byte[] body)

Check the signatures in the java doc of the official java client:
https://rabbitmq.github.io/rabbitmq-java-client/api/current/com/rabbitmq/client/Channel.html

Unsafe arguments

Arguments that are passed through QueueConfig could potentially throw a run time exception

ERROR c.g.gvolpe.fs2rabbit.instances.log$ - invalid value in table
java.lang.IllegalArgumentException: invalid value in table
	at com.rabbitmq.client.impl.Frame.fieldValueSize(Frame.java:310) ~[amqp-client-4.1.0.jar:4.1.0]
	at com.rabbitmq.client.impl.Frame.tableSize(Frame.java:250) ~[amqp-client-4.1.0.jar:4.1.0]
	at com.rabbitmq.client.impl.ValueWriter.writeTable(ValueWriter.java:119) ~[amqp-client-4.1.0.jar:4.1.0]
	at com.rabbitmq.client.impl.MethodArgumentWriter.writeTable(MethodArgumentWriter.java:138) ~[amqp-client-4.1.0.jar:4.1.0]
	at com.rabbitmq.client.impl.AMQImpl$Queue$Declare.writeArgumentsTo(AMQImpl.java:1496) ~[amqp-client-4.1.0.jar:4.1.0]
	at com.rabbitmq.client.impl.Method.toFrame(Method.java:85) ~[amqp-client-4.1.0.jar:4.1.0]
	at com.rabbitmq.client.impl.AMQCommand.transmit(AMQCommand.java:104) ~[amqp-client-4.1.0.jar:4.1.0]
	at com.rabbitmq.client.impl.AMQChannel.quiescingTransmit(AMQChannel.java:363) ~[amqp-client-4.1.0.jar:4.1.0]
	at com.rabbitmq.client.impl.AMQChannel.quiescingTransmit(AMQChannel.java:345) ~[amqp-client-4.1.0.jar:4.1.0]
	at com.rabbitmq.client.impl.AMQChannel.quiescingRpc(AMQChannel.java:280) ~[amqp-client-4.1.0.jar:4.1.0]
	at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:271) ~[amqp-client-4.1.0.jar:4.1.0]
	at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:233) ~[amqp-client-4.1.0.jar:4.1.0]
	at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:128) ~[amqp-client-4.1.0.jar:4.1.0]
	at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:948) ~[amqp-client-4.1.0.jar:4.1.0]
	at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.queueDeclare(AutorecoveringChannel.java:333) ~[amqp-client-4.1.0.jar:4.1.0]
	at com.github.gvolpe.fs2rabbit.interpreter.AmqpClientStream.$anonfun$declareQueue$1(AmqpClientStream.scala:138) ~[classes/:na]
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) ~[scala-library.jar:na]
	at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:74) ~[cats-effect_2.12-0.8.jar:0.8]
	at cats.effect.internals.IORunLoop$.start(IORunLoop.scala:34) ~[cats-effect_2.12-0.8.jar:0.8]
	at cats.effect.IO.unsafeRunAsync(IO.scala:191) ~[cats-effect_2.12-0.8.jar:0.8]
	at cats.effect.IO.$anonfun$runAsync$1(IO.scala:150) ~[cats-effect_2.12-0.8.jar:0.8]
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) ~[scala-library.jar:na]
	at cats.effect.internals.IORunLoop$.step(IORunLoop.scala:154) ~[cats-effect_2.12-0.8.jar:0.8]
	at cats.effect.IO.unsafeRunTimed(IO.scala:220) ~[cats-effect_2.12-0.8.jar:0.8]
	at cats.effect.IO.unsafeRunSync(IO.scala:173) ~[cats-effect_2.12-0.8.jar:0.8]
	at com.github.gvolpe.fs2rabbit.examples.IOApp.main(IOApp.scala:23) ~[classes/:na]
	at com.github.gvolpe.fs2rabbit.examples.IOApp.main$(IOApp.scala:23) ~[classes/:na]
	at com.github.gvolpe.fs2rabbit.examples.IOAckerConsumer$.main(IOAckerConsumer.scala:26) ~[classes/:na]
	at com.github.gvolpe.fs2rabbit.examples.IOAckerConsumer.main(IOAckerConsumer.scala) ~[classes/:na]

Maybe we can create existential types to validate types at compile time using this as a reference
https://github.com/rabbitmq/rabbitmq-java-client/blob/master/src/main/java/com/rabbitmq/client/impl/Frame.java#L256

Fs2 Rabbit includes `logback.xml` which will conflicts with projects also including that file

A project depending on Fs2 Rabbit that uses logback.xml will fail to assemble:

$ sbt assembly
...
[error] 1 error was encountered during merge
[error] java.lang.RuntimeException: deduplicate: different file contents found in the following:                                                             
[error] logback.xml
[error] /home/orium/.ivy2/cache/com.github.gvolpe/fs2-rabbit_2.12/jars/fs2-rabbit_2.12-0.5.jar:logback.xml                       
...

Failover to next node if current node becomes unavailable

Hey @gvolpe ,
while evaluating your very promising library for use in a new service, I noticed that specifying multiple nodes of the same RabbitMQ cluster does not seem to be supported. The expectation would be to have the service automatically establish a connection to node B in case node A becomes unreachable, if both, node A and B are specified in the configuration.

I tried to work around this by defining multiple Fs2RabbitConfigs, instantiating one Fs2Rabbit[F] for each of them and then calling my own createConnectionChannel() to pick the first successful connection:

object Fs2RabbitUtil {

  private val logger = getLogger

  def createConnectionChannel[F[_]: Sync](
      nodes: List[Fs2Rabbit[F]]
  ): Stream[F, (Fs2Rabbit[F], AMQPChannel)] =
    nodes match {
      case node :: otherNodes =>
        (
          for {
            channel <- node.createConnectionChannel
            _       <- Stream.eval(Sync[F].delay(logger.info("Connected to RabbitMQ node.")))
          } yield (node, channel)
        ) handleErrorWith {
          case NonFatal(e) =>
            for {
              _       <- Stream.eval(Sync[F].delay(logger.warn(e)(s"Connection attempt to RabbitMQ node failed.")))
              channel <- createConnectionChannel(otherNodes)
            } yield channel
        }
      case List() => Stream.raiseError[F](new Exception(s"Connection attempts to all RabbitMQ nodes failed."))
    }
}

In connection with ResilientStream.run() I would have expected this to be enough to fail the stream in case a node becomes unreachable and then createConnectionChannel would pick the next node and restart the stream. However amqp-client only knows about one node and since automatic recovery is not disabled, amqp-client will try to reconnect to the same node over and over again instead of letting the stream fail.

There's two possible solutions to support failover that I can think of:

  • Disable automatic recovery in amqp-client's ConnectionFactory and let the stream fail. Users of the library would then have to handle the failover themselves.
  • Support multiple nodes in Fs2RabbitConfig and pass them on to amqp-client, e.g.:
    Fs2RabbitConfig(
        nodes = List(
            Fs2RabbitNodeConfig(
                host = "host1.example.net",
                port = 5672
            ),
            Fs2RabbitNodeConfig(
                host = "host2.example.net",
                port = 5672
            )
        ),
        virtualHost = "/",
        connectionTimeout = 10,
        ssl = false,
        username = None,
        password = None,
        requeueOnNack = true,
        internalQueueSize = None
     )
    amqp-client already supports configuring multiple nodes and will fall back to the other nodes automatically.

Consumer should be treated as a resource

Hi @gvolpe, thank you for the great RabbitMQ client.

I found an unexpected behavior with consumers. The completion of a stream will not terminate a consumer. The "Program 1" will consume one published message, but the "Program 2" will not, because consumer from "Program 1" is still attached to RabbitMQ.
Below you can find the full example.

object Fs2RabbitPlayground extends IOApp {

  override def run(args: List[String]): IO[ExitCode] = {
    val rabbitIO = Fs2Rabbit[IO](
      Fs2RabbitConfig("localhost", 53125, "test", 3, false, None, Some("admin"), Some("admin"), false)
    )

    val queue = QueueName("my-queue")

    Stream
      .eval(rabbitIO)
      .flatMap { implicit rabbit =>
        rabbit.createConnectionChannel.flatMap { implicit channel =>
          rabbit.declareQueue(DeclarationQueueConfig.default(queue)).flatMap { _ =>
            val routingKey = RoutingKey(queue.value)

            program("Program 1", routingKey, queue)
            program("Program 2", routingKey, queue)

            Stream.empty
          }
        }
      }
      .compile
      .drain
      .map(_ => ExitCode.Success)
  }

  def program(name: String, routingKey: RoutingKey, queueName: QueueName)(implicit rabbit: Fs2Rabbit[IO],
                                                                          channel: AMQPChannel): Unit = {
    val flow = for {
      _ <- publish(s"message $name", routingKey)
      result <- consumer(s"consumer $name", queueName)
    } yield result

    val result = flow.take(1).compile.drain.unsafeRunTimed(5.seconds)
    println(s"[$name] completed. Result $result")
  }

  def publish(value: String, routingKey: RoutingKey)(implicit rabbit: Fs2Rabbit[IO], channel: AMQPChannel): Stream[IO, Unit] = {
    for {
      publisher <- rabbit.createPublisher(ExchangeName(""), routingKey)
      message = AmqpMessage(value, AmqpProperties.empty)
      _ <- Stream(message).covary[IO] to publisher
      _ = println(s"Message [$message] published to $routingKey")
    } yield ()
  }

  def consumer(name: String, queue: QueueName)(implicit rabbit: Fs2Rabbit[IO], channel: AMQPChannel): Stream[IO, AmqpEnvelope] = {
    for {
      ackConsumer <- rabbit.createAutoAckConsumer(queue)
      envelope    <- ackConsumer
    } yield envelope
  }

}

From my perspective, a consumer should be treated as a resource. Thus allocation of a consumer can be similar to:

Stream.resource(
  AMQP.basicConsume(channel, queueName, autoAck, consumerTag, noLocal, exclusive, args)(internals)
)(consumerTag => AMQP.basicCancel(consumerTag))

I have added 'basicCancel' method in a scope of #134.

Termination of RabbitMQ ignored by the connection

Reproducing flow:

  1. Start RabbitMQ;
  2. Open connection to RabbitMQ;
  3. Stop RabbitMQ;
  4. No error produced by a connection stream;

RabbitMQ client provides automatic recovery mechanism, but I'm not sure that it will be very useful.
From my perspective, recovery management should be done manually:

  1. factory.setAutomaticRecoveryEnabled(false)
  2. connection.addShutdownListener(shutdownCause => { recovery logic? })

Right now I have no idea how it can be implemented because the shutdown listener is async. If you will suggest a possible solution, I can try to work on it.

Implement exchangeDelete(String exchange)

Bare in mind that fs2-rabbit is just a wrapper for the AMQP Java client. So, whenever adding a new method that is supported by the native client, the procedure will be more or less the same:

  1. Check the signatures in the java doc of the official java client:
    https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/Channel.html
  2. For this case, the return type is AMQP.Exchange.DeleteOk so in fs2-rabbit it will be Stream[F, Exchange.DeleteOk]
  3. Add the new method at the end of the Fs2Rabbit file.
  4. Add scala doc comments to the new method. Take a look at the existent methods but in general it could be more or less similar to the Java client docs. This is the one for this method: "Delete an exchange, without regard for whether it is in use or not"
  5. Add a unit test in the file Fs2RabbitSpec: I can help you out with this part since it's not as trivial as adding the new method and sometimes it might be okay to just invoke the method on an existent test instead of creating a new one.

HINT: Look at the existent method bindQueue, this one will be very similar ;)

Thanks for contributing!

Allow multiple channels per connection

ConnectionStream.createConnectionChannel creates both a connection and a channel and there doesn't seem to be a way of creating multiple channels over a single connection, unless I'm missing something. According to https://stackoverflow.com/questions/18418936/rabbitmq-and-relationship-between-channel-and-connection, the rabbitmq model favours an approach of creating multiple channels within an application over a single connection to the server to reduce the amount of resources used. This is also important in my application where we're connecting to a Rabbit-as-a-service server which has a limited number of connections available.

It would be great to have some mechanism to create additional channels on an open connection.

1.0 Release blocker? Suspicious code in ConnectionStream class

Hi,

I noticed some code in the ConnectionStream class today:

  private[fs2rabbit] lazy val connFactory: F[ConnectionFactory] =
    F.delay {
        …
    }

  private[fs2rabbit] def acquireConnection: F[(RabbitMQConnection, AMQPChannel)] =
    for {
      factory <- connFactory
      conn    <- F.delay(factory.newConnection)
      channel <- F.delay(conn.createChannel)
    } yield (conn, RabbitChannel(channel))

  override def createConnectionChannel: Stream[F, AMQPChannel] =

connFactory is a lazy val, while acquireConnection and createConnectionChannel are defs without parameters. This seems odd to me. defs without parameters are equivalent to vals when you do functional programming (cf. John de Goes), so why not make everything a val and be more efficient?

To me, this looks like the intention was to lazily initialize the ConnectionFactory exactly once when a connection is first acquired. But what this code actually does is create a new ConnectionFactory every time a connection is acquired. I would guess that that's not what the intention was, right? You can easily check this by e. g. adding some print statements in connFactory's F.delay block.

I would like to point out that this issue is probably impossible to (correctly) fix this issue without changing the signature of Fs2Rabbit.apply. It currently returns Fs2Rabbit[F], but it will have to return F[Fs2Rabbit[F]]. This kind of initialization (whether lazy or non-lazy) ultimately requires some form of mutable state, and creating a mutable variable is in itself a side effect, as can be seen e. g. from the signature of cats.effect.concurrent.Ref.of, which returns F[Ref[F, A]] rather than Ref[F, A]. So this is something that needs to be taken care of before the 1.0 release which I understand is imminent.

Support declarative DSL style

Explore the idea of adding a DSL stype API, using the builder pattern as other libraries do.

Here's an example of how it's done now:

def p1(R: Fs2Rabbit[IO]) =
  R.createConnectionChannel.use { implicit channel =>
    R.declareExchange(ex, ExchangeType.Topic) *>
    R.declareQueue(DeclarationQueueConfig.default(q1)) *>
    R.bindQueue(q1, ex, rk) *>
    R.createAutoAckConsumer[String](q1)
  }

By introducing a DSL we should be able to do it in this way:

def p1(client: Fs2Rabbit[IO]) =
  client.createConnectionChannel.use { implicit channel =>
    client
      .declareExchange(ex, ExchangeType.Topic)
      .declareQueue(DeclarationQueueConfig.default(q1)) 
      .bindQueue(q1, ex, rk) *>
    client.createAutoAckConsumer[String](q1)
  }

Both ways should be equivalent but probably the DSL style is more friendly.

Essentially any method returning F[Unit] could be added to the DSL.

Implement queueDelete(String queue)

Bare in mind that fs2-rabbit is just a wrapper for the AMQP Java client. So, whenever adding a new method that is supported by the native client, the procedure will be more or less the same:

  1. Check the signatures in the java doc of the official java client:
    https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/Channel.html
  2. For this case, the return type is AMQP.Queue.DeleteOk so in fs2-rabbit it will be Stream[F, Queue.DeleteOk]
  3. Add the new method at the end of the Fs2Rabbit file.
  4. Add scala doc comments to the new method. Take a look at the existent methods but in general it could be more or less similar to the Java client docs. This is the one for this method: "Delete a queue, without regard for whether it is in use or has messages on it"
  5. Add a unit test in the file Fs2RabbitSpec: I can help you out with this part since it's not as trivial as adding the new method and sometimes it might be okay to just invoke the method on an existent test instead of creating a new one.

HINT: Look at the existent method bindQueue, this one will be very similar ;)

Thanks for contributing!

Stable dependencies and a temporary fork of fs2-rabbit

Hi @gvolpe,

We are using fs2-rabbit in production and because of that we don't want to use unstable libraries which version 0.8 currently would force us to (cats-effect, fs2, and circe). We have therefore forked your project in https://github.com/ITV/fs2-rabbit. That fork is basically the same thing as your project but with stable versions of the dependencies (and with #76 landed).

We don't want this to be a permanent solution. Ideally you would provide a stable version of fs2-rabbit that we could use.

Thanks!

Add support for different types of queueDeclare

Currently the library implements only one of this flavor, but there are more to add:

  • queueDeclare()
  • queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String,Object> arguments) [PARTIALLY IMPLEMENTED(*)]
  • queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String,Object> arguments)
  • queueDeclarePassive(String queue)

Check the signatures in the java doc of the official java client:
https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/Channel.html

(*) Fs2Rabbit#L178 uses this method but doesn't expose all the options, need to model it.

Create a single ConnectionFactory

Even when invoking createConnectionChannel multiple times it seems unnecessary to create more than on ConnectionFactory as it internally has a "shared executor" and performs many other allocations when created.

For more information see the discussion on #118

Implement queueUnbind(String queue, String exchange, String routingKey)

Bare in mind that fs2-rabbit is just a wrapper for the AMQP Java client. So, whenever adding a new method that is supported by the native client, the procedure will be more or less the same:

  1. Check the signatures in the java doc of the official java client:
    https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/Channel.html
  2. For this case, the return type is AMQP.Queue.UnbindOk so in fs2-rabbit it will be Stream[F, Queue.UnbindOk]
  3. Add the new method at the end of the Fs2Rabbit file.
  4. Add scala doc comments to the new method. Take a look at the existent methods but in general it could be more or less similar to the Java client docs. This is the one for this method: "Unbind a queue from an exchange, with no extra arguments."
  5. Add a unit test in the file Fs2RabbitSpec: I can help you out with this part since it's not as trivial as adding the new method and sometimes it might be okay to just invoke the method on an existent test instead of creating a new one.

HINT: Look at the existent method bindQueue, this one will be very similar ;)

Thanks for contributing!

Support all standard properties in AmqpProperties

It looks like there is no way to add a correlation id or a message id for a message. By extension, it would be nice to support all the standard properties (user_id, type, app_id, ...) in addition of those which are already supported (content_type, content_encoding, priority, deliveryMode).

Add support for different types of basicConsume

Implement these methods:

  • basicConsume(String queue, boolean autoAck, Consumer callback) [IMPLEMENTED]
  • basicConsume(String queue, boolean autoAck, Map<String,Object> arguments, Consumer callback)
  • basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String,Object> arguments, Consumer callback)
  • basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback)
  • basicConsume(String queue, Consumer callback)

It might require some design decisions.

Check the signatures in the java doc of the official java client:
https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/Channel.html

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.