Giter Club home page Giter Club logo

amqp's Introduction

AMQP

Build Status Module Version Hex Docs Total Download Last Updated License

Simple Elixir wrapper for the Erlang RabbitMQ client.

The API is based on Langohr, a Clojure client for RabbitMQ.

Upgrading guides

To upgrade from the old version, please read our upgrade guides:

Usage

Add AMQP as a dependency in your mix.exs file.

def deps do
  [
    {:amqp, "~> 3.3"}
  ]
end

Elixir will start amqp automatically with this if you use Elixir 1.6+.

If that's not the case (use Application.started_applications/0 to check), try adding :amqp to applications or extra_applications in your mix.exs. Or call Application.ensure_started(:amqp) at the start.

After you are done, run mix deps.get in your shell to fetch and compile AMQP. Start an interactive Elixir shell with iex -S mix.

iex> {:ok, conn} = AMQP.Connection.open()
# {:ok, %AMQP.Connection{pid: #PID<0.165.0>}}

iex> {:ok, chan} = AMQP.Channel.open(conn)
# {:ok, %AMQP.Channel{conn: %AMQP.Connection{pid: #PID<0.165.0>}, pid: #PID<0.177.0>}

iex> AMQP.Queue.declare(chan, "test_queue")
# {:ok, %{consumer_count: 0, message_count: 0, queue: "test_queue"}}

iex> AMQP.Exchange.declare(chan, "test_exchange")
# :ok

iex> AMQP.Queue.bind(chan, "test_queue", "test_exchange")
# :ok

iex> AMQP.Basic.publish(chan, "test_exchange", "", "Hello, World!")
# :ok

iex> {:ok, payload, meta} = AMQP.Basic.get(chan, "test_queue")
iex> payload
# "Hello, World!"

iex> AMQP.Queue.subscribe(chan, "test_queue", fn payload, _meta -> IO.puts("Received: #{payload}") end)
# {:ok, "amq.ctag-5L8U-n0HU5doEsNTQpaXWg"}

iex> AMQP.Basic.publish(chan, "test_exchange", "", "Hello, World!")
# :ok
# Received: Hello, World!

Setup a consumer GenServer

defmodule Consumer do
  use GenServer
  use AMQP

  def start_link do
    GenServer.start_link(__MODULE__, [], [])
  end

  @exchange    "gen_server_test_exchange"
  @queue       "gen_server_test_queue"
  @queue_error "#{@queue}_error"

  def init(_opts) do
    {:ok, conn} = Connection.open("amqp://guest:guest@localhost")
    {:ok, chan} = Channel.open(conn)
    setup_queue(chan)

    # Limit unacknowledged messages to 10
    :ok = Basic.qos(chan, prefetch_count: 10)
    # Register the GenServer process as a consumer
    {:ok, _consumer_tag} = Basic.consume(chan, @queue)
    {:ok, chan}
  end

  # Confirmation sent by the broker after registering this process as a consumer
  def handle_info({:basic_consume_ok, %{consumer_tag: consumer_tag}}, chan) do
    {:noreply, chan}
  end

  # Sent by the broker when the consumer is unexpectedly cancelled (such as after a queue deletion)
  def handle_info({:basic_cancel, %{consumer_tag: consumer_tag}}, chan) do
    {:stop, :normal, chan}
  end

  # Confirmation sent by the broker to the consumer process after a Basic.cancel
  def handle_info({:basic_cancel_ok, %{consumer_tag: consumer_tag}}, chan) do
    {:noreply, chan}
  end

  def handle_info({:basic_deliver, payload, %{delivery_tag: tag, redelivered: redelivered}}, chan) do
    # You might want to run payload consumption in separate Tasks in production
    consume(chan, tag, redelivered, payload)
    {:noreply, chan}
  end

  defp setup_queue(chan) do
    {:ok, _} = Queue.declare(chan, @queue_error, durable: true)
    # Messages that cannot be delivered to any consumer in the main queue will be routed to the error queue
    {:ok, _} = Queue.declare(chan, @queue,
                             durable: true,
                             arguments: [
                               {"x-dead-letter-exchange", :longstr, ""},
                               {"x-dead-letter-routing-key", :longstr, @queue_error}
                             ]
                            )
    :ok = Exchange.fanout(chan, @exchange, durable: true)
    :ok = Queue.bind(chan, @queue, @exchange)
  end

  defp consume(channel, tag, redelivered, payload) do
    number = String.to_integer(payload)
    if number <= 10 do
      :ok = Basic.ack channel, tag
      IO.puts "Consumed a #{number}."
    else
      :ok = Basic.reject channel, tag, requeue: false
      IO.puts "#{number} is too big and was rejected."
    end

  rescue
    # Requeue unless it's a redelivered message.
    # This means we will retry consuming a message once in case of exception
    # before we give up and have it moved to the error queue
    #
    # You might also want to catch :exit signal in production code.
    # Make sure you call ack, nack or reject otherwise consumer will stop
    # receiving messages.
    exception ->
      :ok = Basic.reject channel, tag, requeue: not redelivered
      IO.puts "Error converting #{payload} to integer"
  end
end
iex> Consumer.start_link
{:ok, #PID<0.261.0>}
iex> {:ok, conn} = AMQP.Connection.open
{:ok, %AMQP.Connection{pid: #PID<0.165.0>}}
iex> {:ok, chan} = AMQP.Channel.open(conn)
{:ok, %AMQP.Channel{conn: %AMQP.Connection{pid: #PID<0.165.0>}, pid: #PID<0.177.0>}
iex> AMQP.Basic.publish chan, "gen_server_test_exchange", "", "5"
:ok
Consumed a 5.
iex> AMQP.Basic.publish chan, "gen_server_test_exchange", "", "42"
:ok
42 is too big and was rejected.
iex> AMQP.Basic.publish chan, "gen_server_test_exchange", "", "Hello, World!"
:ok
Error converting Hello, World! to integer
Error converting Hello, World! to integer

Configuration

Connections and channels

You can define a connection and channel in your config and AMQP will automatically...

  • Open the connection and channel at the start of the application
  • Automatically try to reconnect if they are disconnected
config :amqp,
  connections: [
    myconn: [url: "amqp://guest:guest@myhost:12345"],
  ],
  channels: [
    mychan: [connection: :myconn]
  ]

You can access the connection/channel via AMQP.Application.

iex> {:ok, chan} = AMQP.Application.get_channel(:mychan)
iex> :ok = AMQP.Basic.publish(chan, "", "", "Hello")

When a channel is down and reconnected, you have to make sure your consumer subscribes to a channel again.

See the documentation for AMQP.Application.get_connection/1 and AMQP.Application.get_channel/1 for more details.

Types of arguments and headers

The parameter arguments in Queue.declare, Exchange.declare, Basic.consume and the parameter headers in Basic.publish are a list of tuples in the form {name, type, value}, where name is a binary containing the argument/header name, type is an atom describing the AMQP field type and value a term compatible with the AMQP field type.

The valid AMQP field types are:

:longstr | :signedint | :decimal | :timestamp | :table | :byte | :double | :float | :long | :short | :bool | :binary | :void | :array

Valid argument names in Queue.declare include:

  • "x-expires"
  • "x-message-ttl"
  • "x-dead-letter-routing-key"
  • "x-dead-letter-exchange"
  • "x-max-length"
  • "x-max-length-bytes"

Valid argument names in Basic.consume include:

  • "x-priority"
  • "x-cancel-on-ha-failover"

Valid argument names in Exchange.declare include:

  • "alternate-exchange"

Troubleshooting / FAQ

Consumer stops receiving messages

It usually happens when your code doesn't send acknowledgement(ack, nack or reject) after receiving a message.

If you use GenServer for your consumer, try storing the number of messages the server is currently processing to the GenServer state.

If the number equals prefetch_count, those messages were left without acknowledgements and that's why the consumer has stopped receiving more messages.

Also review the following points:

  • when an exception was raised how it would be handled
  • when :exit signal was thrown how it would be handled
  • when a message processing took long time what could happen

Also make sure that the consumer monitors the channel pid. When the channel is gone, you have to reopen it and subscribe to a new channel again.

The version compatibility

Check out this article to find out the compatibility with Elixir, OTP and RabbitMQ.

Heartbeats

In case the connection is dropped automatically, consider enabling heartbeats.

You can set heartbeat option when you open a connection.

For more details, read this article

Does the library support AMQP 1.0?

Currently the library doesn't support AMQP 1.0 and there is no plan to do so at the moment. Our main aim here (at least for now) is to provide a thin wrapper around amqp_client for Elixir programmers.

Copyright and License

Copyright (c) 2014 Paulo Almeida

This library is MIT licensed. See the LICENSE for details.

amqp's People

Contributors

akoutmos avatar aymanosman avatar balena avatar begedin avatar binarin avatar craigp avatar edgurgel avatar erez-rabih avatar evnu avatar fredberger avatar haljin avatar jvoegele avatar kianmeng avatar leodag avatar lucacorti avatar nathanl avatar octosteve avatar ono avatar pma avatar rrrene avatar sircinek avatar slavazim avatar stratus3d avatar sunboshan avatar take-five avatar thiamsantos avatar timothyvanderaerden avatar whatyouhide avatar woylie avatar zaherhs avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

amqp's Issues

Suspend Channe or Connection

Hi,

I need to suspend or stop temporarily receiving messages from my Queue. I close the Channel like this

Channel.close(chan)

When try to reopen the channel, not the connection, at start receiving again the messages from the Queue, I call

Channel.open(chan)

But I receive the error message

no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started

How can I do this ?

Thanks

Getting amqp (Hex package) ** (ArgumentError) argument error

While running make all step for RabbitMQ Server 3.7.0, I am getting the following error:

`Putting child 0x95a5d000 (escript/rabbitmqctl) PID 20257 on the chain.
Live child 0x95a5d000 (escript/rabbitmqctl) PID 20257
GEN escript/rabbitmqctl
Resolving Hex dependencies...
Dependency resolution completed:
amqp 0.2.4
csv 2.0.0
dialyxir 0.5.1
json 1.0.2
parallel_stream 1.0.5
simetric 0.2.0
temp 0.4.3

  • Getting amqp (Hex package)
    ** (ArgumentError) argument error
    (stdlib) re.erl:734: :re.run("https://repo.hex.pm/tarballs/amqp-0.2.4.tar", {:re_pattern, 1, 0, 0, <<69, 82, 67, 80, 91, 0, 0, 0, 0, 0, 0, 0, 81, 0, 0, 0, 255, 255, 255, 255, 255, 255, 255, 255, 47, 0, 64, 0, 0, 0, 1, 0, 0, 0, 64, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, ...>>}, [{:capture, :all, :index}, :global])
    (elixir) lib/regex.ex:590: Regex.do_replace/4
    (hex) lib/hex/scm.ex:103: Hex.SCM.update/1
    (hex) lib/hex/scm.ex:150: Hex.SCM.checkout/1
    (mix) lib/mix/dep/fetcher.ex:64: Mix.Dep.Fetcher.do_fetch/3
    (mix) lib/mix/dep/converger.ex:180: Mix.Dep.Converger.all/9
    (mix) lib/mix/dep/converger.ex:189: Mix.Dep.Converger.all/9
    (mix) lib/mix/dep/converger.ex:119: Mix.Dep.Converger.all/7
    Reaping losing child 0x95a5d000 PID 20257
    Makefile:73: recipe for target 'escript/rabbitmqctl' failed
    make: *** [escript/rabbitmqctl] Error 1
    Removing child 0x95a5d000 PID 20257 from chain.`

Conflict between lager and elixir logger when using amqp as a dependency

When adding amqp ~> 1.0.0 as dependency in a fresh mix project and starting interactive mix, we get an error message:

09:58:35.987 [error] Supervisor 'Elixir.Logger.Supervisor' had child 'Elixir.Logger.ErrorHandler' started with 'Elixir.Logger.Watcher':start_link({error_logger,'Elixir.Logger.ErrorHandler',{true,false,500}}) at <0.455.0> exit with reason normal in context child_terminated

This can be mitigated by starting lager before logger in our application, but I think it is unfortunate to change all our applications code for fixing this error.

Connection Recovery and Built-in Supervision Tree?

I'm starting up a project which needs to have some pretty solid rabbitmq support including failing over to another connection in the case of a network partition or a node dies in the rabbit cluster.

I would also like to have a standard supervision tree that I can just drop into an application and provide some configuration for which queues and bindings I would like to receive messages from and which functions should be called for those messages.

That way I don't have to re-invent the supervision tree and connection retry semantics on my own each time.

I have some experience doing this in ruby, and I see that there is one potential solution in erlang, but I would rather invest my time into make this available to the hex.pm community.

Are these features things you would like to see in your project? Or should I start a separate project that builds on top of amqp/amqp-client?

Support passing arguments and headers without specifying the AMQP type

For known arguments in Queue.declare, Exchange.declare and Basic.consume, like "x-message-ttl" the API should accept arguments: ["x-message-ttl": 5000] and convert automatically to arguments: [{"x-message-ttl", :long, 5000}] as expected by the Erlang client.

This can also be applied to headers in Basic.publish, where the most common types (:longstr, :long and :bool) could be inferred from the value.

The format {name, type, value} can still be supported for all other cases.

@peck This would also address #4

** (stop) :unexpected_delivery_and_no_default_consumer

Hi mates,

I am having a strange behavior running my consumer, once it is started by the main app supervisor, I see the following errors:

23:51:10.144 [error] GenServer #PID<0.328.0> terminating
** (stop) exited in: :gen_server2.call(#PID<0.327.0>, {:consumer_call, {:"basic.cancel_ok", "amq.ctag-acZaaX2r1zshRRALqkfieg"}, {:"basic.cancel", "amq.ctag-acZaaX2r1zshRRALqkfieg", false}}, :infinity)
    ** (EXIT) :unexpected_delivery_and_no_default_consumer
    (rabbit_common) src/gen_server2.erl:340: :gen_server2.call/3
    (amqp_client) src/amqp_channel.erl:745: :amqp_channel.handle_method_from_server1/3
    (stdlib) gen_server.erl:615: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:681: :gen_server.handle_msg/5
    (stdlib) proc_lib.erl:240: :proc_lib.init_p_do_apply/3

It only happens on my Rabbit QA instance, locally it goes well,

The consumer do receives messages normally ๐Ÿค”

Any thoughts?

Connection not monitored

Hi,
More than an issue this is a question!

I was wondering why the connection.pid is not monitored. We had the GenServer dying on us because of a dropped heartbeat and couldn't understand what was happening until we read the comment in the source code.

If it's a matter of putting some time to harden the code against a missed heartbeat let me know, I'd be happy to help.

Elixir 1.2

Can we relax the Elixir dependency a bit please?

It's running fine on Elixir 1.2.0 for me.

** (Mix) Could not start application recon: could not find application file: recon.app

a very strange error, I used the same version in another project, and it's all good. Now, when I create a new project on the same machine and add the amqp library, when I try to start the server I get this error.

[info] Application syntax_tools exited: :stopped
[info] Application xmerl exited: :stopped
[info] Application cowboy exited: :stopped
[info] Application cowlib exited: :stopped
[info] Application ranch exited: :stopped
[info] Application runtime_tools exited: :stopped
 
=INFO REPORT==== 30-Nov-2017::19:05:28 ===
    application: logger
    exited: stopped
    type: temporary
** (Mix) Could not start application recon: could not find application file: recon.app

deps

    [
      {:phoenix, "~> 1.3.0"},
      {:phoenix_pubsub, "~> 1.0"},
      {:phoenix_ecto, "~> 3.2"},
      {:postgrex, ">= 0.0.0"},
      {:phoenix_html, "~> 2.10"},
      {:phoenix_live_reload, "~> 1.0", only: :dev},
      {:gettext, "~> 0.11"},
      {:cowboy, "~> 1.0"},
      {:amqp, "~> 1.0.0-pre.2"}
    ]

The other old project runs without errors

Configuration via config.exs

I wonder if we could configure AMQP connection credentials via standard elixir config.exs.
As I can see it is common practice, and this library should follow it.

I can start work on it and make a PR.

@pma, what do you think?

Unhandled terminate function clause

This may be inside the underlying amqp lib. If you think this is a lower level issue please feel free to close.

{{:function_clause, [{:amqp_gen_connection, :terminate, [{:function_clause, [{:inet_dns, :encode_labels, [<<0, 97, 1, 0, 0, 1, 0, 0, 0, 0, 0, 0, 10, 98, 114, 105, 115, 107, 45, 98, 101, 97, 114, 3, 114, 109, 113, 9, 99, 108, 111, 117, 100, 97, 109, 113, 112, ...>>, {4, {["brisk-bear", "rmq", "cloudamqp", "com", "", "home"], 12, nil, {["rmq", "cloudamqp", "com", "", "home"], 23, {["cloudamqp", "com", "", "home"], 27, nil, {["com", "", "home"], 37, nil, nil}}, nil}}}, 41, ["", "home"]], [file: 'inet_dns.erl', line: 694]}, {:inet_dns, :encode_name, 4, [file: 'inet_dns.erl', line: 675]}, {:inet_dns, :encode_query_section, 3, [file: 'inet_dns.erl', line: 269]}, {:inet_dns, :encode, 1, [file: 'inet_dns.erl', line: 240]}, {:inet_res, :make_query, 5, [file: 'inet_res.erl', line: 670]}, {:inet_res, :make_query, 4, [file: 'inet_res.erl', line: 638]}, {:inet_res, :res_query, 6, [file: 'inet_res.erl', line: 622]}, {:inet_res, :res_getby_query, 4, [file: 'inet_res.erl', line: 589]}]}, {#PID<0.869.0>, {:amqp_params_network, "device_863", "eyJhbGciOiJREDACTEDXVCJ9.eyJhdWQiOiJib3QiLCJzdWIiOjg2NCwiaWF0IjoxNTI0NTc5NDM2LCJqdGkiOiJlYWVjYzE1MS04YWRiLTQwMWItYjc3YS0zZDkxMTkzMzNmOWUiLCJpc3MiOiIvL215LmZhcm1ib3QuaW86NDQzIiwiZXhwIjoxNTI4MDM1NDM2LCJtcXR0IjoiYnJpc2stYmVhci5ybXEuY2xvdWRhbXFwLmNvbSIsImJvdCI6ImRldmljZV84NjMiLCJ2aG9zdCI6InZiemN4c3FyIiwibXF0dF93cyI6IndzczovL2JyaXNrLWJlYXIucm1xLmNsb3VkYW1xcC5jb206NDQzL3dzL21xdHQiLCJvc191cGRhdGVfc2VydREDACTEDGkuZ2l0aHViLmNvbS9yZXBvcy9mYXJtYm90L2Zhcm1ib3Rfb3MvcmVsZWFzZXMvbGF0ZXN0IiwiaW50ZXJpbV9lbWFpbCI6InN0ZXBoZW5hZkBsZWVzY2hvb2xzLm5ldCIsImZ3X3VwZGF0ZV9zZXJ2ZXIiOiJERVBSRUNBVEVEIiwiYmV0YV9vc191cGRhdGVfc2VydmVyIjoiaHR0cHM6Ly9hcGkuZ2l0aHViLmDONTHACKMEvcy9GYXJtQm90L2Zhcm1ib3Rfb3MvcmVsZWFzZXMvMTA2MzU4NjgifQ.tHF5Q5ALVHoELfsafjGvQ2iBCKEoHlzEt5w4wL1Yj31TJipxfBJNgnfXcO_-8iS4jtevo4yfzujmNUlj8nHfnF-F-PuREDACTEDvKxi8N8np8Uu2Q2DXb02CA19wQ33T5K-onTNtTny8sTbZs5MtmupNd_l0LrcnWMwm3ujg-cbHR4bAYAOTPYvs9Jc7moSdd_tPyFd8lRh5xmI-ajgclNBVdq-JC_KQRN0QiHWA_VvnraaHQv0jt027j_8hnVN8DkqDrkQH0rA02Fy8PK3F_iODOle-6FrNZXOZuuvssxCgjkjp7jiBUCtmDBCemBAIro7ZVWXbc3lUQvSOnMg", "vbzcxsqr", 'brisk-bear.rmq.cloudamqp.com', 5672, 0, 0, 0, :infinity, :none, [&:amqp_auth_mechanisms.plain/3, &:amqp_auth_mechanisms.amqplain/3], [], []}}], [file: 'src/amqp_gen_connection.erl', line: 239]}, {:gen_server, :try_terminate, 3, [file: 'gen_server.erl', line: 648]}, {:gen_server, :terminate, 10, [file: 'gen_server.erl', line: 833]}, {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 247]}]}, {:gen_server, :call, [#PID<0.870.0>, :connect, :infinity]}}

Send a message including type

Hi,

I need to send a message setting the type field.
How can I do it. Can I use the BasicPublish ?

Thanks in advance

Direct connections to broker should be supported

I'm currently trying to implement RabbitMQ plugin in Elixir, and having a direct connection is an essential piece of functionality.

I see 2 possible ways to handle this:

  1. Add new function AMQP.Connection.open_direct/1, which will accept completely different set of options compared to open/1
  2. Add 2 more clauses to open/1 which will be using is_record guard to check for directly supplied amqp_params_network and amqp_params_direct records and just use them without changing.

WDYT? What option is acceptable? I'll be more than happy to make a PR on this issue.

Compilation failed on Erlang 19

Hello,

I upgraded to Erlang 19 and Elixir 1.3.1 and I get a compilation error. I believe the problem is with amqp_client and if you want me to file a bug there I will do it. The traceback is as follows:

$ iex -S mix
Erlang/OTP 19 [erts-8.0] [source] [64-bit] [smp:4:4] [async-threads:10] [kernel-poll:false]

==> amqp_client (compile)
include/amqp_gen_consumer_spec.hrl:30: syntax error before: '/'
include/amqp_gen_consumer_spec.hrl:31: syntax error before: '/'
include/amqp_gen_consumer_spec.hrl:32: syntax error before: '/'
include/amqp_gen_consumer_spec.hrl:34: syntax error before: '/'
include/amqp_gen_consumer_spec.hrl:35: syntax error before: '/'
include/amqp_gen_consumer_spec.hrl:36: syntax error before: '/'
include/amqp_gen_consumer_spec.hrl:37: syntax error before: '/'
include/amqp_gen_consumer_spec.hrl:38: syntax error before: '/'
include/amqp_gen_consumer_spec.hrl:39: syntax error before: '/'
include/amqp_gen_consumer_spec.hrl:42: syntax error before: '/'
include/amqp_gen_consumer_spec.hrl:30: syntax error before: '/'
include/amqp_gen_consumer_spec.hrl:31: syntax error before: '/'
include/amqp_gen_consumer_spec.hrl:32: syntax error before: '/'
include/amqp_gen_consumer_spec.hrl:34: syntax error before: '/'
include/amqp_gen_consumer_spec.hrl:35: syntax error before: '/'
include/amqp_gen_consumer_spec.hrl:36: syntax error before: '/'
include/amqp_gen_consumer_spec.hrl:37: syntax error before: '/'
include/amqp_gen_consumer_spec.hrl:38: syntax error before: '/'
include/amqp_gen_consumer_spec.hrl:39: syntax error before: '/'
include/amqp_gen_consumer_spec.hrl:42: syntax error before: '/'
Compiling src/amqp_selective_consumer.erl failed:
ERROR: compile failed while processing /home/iamd3vil/projects/strategies/deps/amqp_client: rebar_abort

This is on Ubuntu 14.04.

Let me know if you want to know anything else.

AMQP.Basic.publish does not publish message when expiration is set

It is not possible to set the per message ttl.
There is a strange Issue when using the "expiration" option with Basic.publish. The following Code only publishes the messages without the expiration option set. Messages are not published and you do not get any error if you set the expiration option, it seems that these messages are just ignored.

The following Code publishes only the Messages Test 1 - 4 to Rabbit MQ:.
#.. Connection and Queue declaration removed due readability

AMQP.Basic.publish(channel, "", "test", "Test 1")
AMQP.Basic.publish(channel, "", "test", "Test 2")
AMQP.Basic.publish(channel, "", "test", "Test 3", [])

for i <- 1..500 do
AMQP.Basic.publish(channel, "", "test", "Test with options!#{to_string(i)}", [expiration: 5000])
end
AMQP.Basic.publish(channel, "", "test", "Test 4", [])

Unable to change the queue mode

I expected to be able to change the queue mode from lazy to default with --

Queue.declare(chan, @queue, durable: true, arguments: [{"x-dead-letter-exchange", :longstr, ""}, {"x-dead-letter-routing-key", :longstr, @queue_error}, {"x-queue-mode", :longstr, "default"}, {"x-max-priority", :short, 10}])

but this does not seem to work. The queue will still display queue-mode: lazy

I am running on rabbit 3.7.3 and Erlang 20.1.

Detecting Connection and Channel Down

Hi,

I would like to know how can I trace the errors when the connection or channel are down.

I have a supervisor of my module and I can open and read messages from a queue.
If I go to Rabbit Management application and kill my connection I receive the message:
[warn] Connection (#PID<0.158.0>) closing: received hard error {:"connection.close", 320, "CONNECTION_FORCED - Closed via management plugin",
0, 0} from server

But the Supervisor does not instantiates a new process. My handle_info is mattching basic_cancel, basic_cancel_ok and a Catch all. By the way the same Supervisor works when my GenEvent Handler crashes.

Why none of the handle info where selected ?

Announcement - Plan for 1.0.0

We plan to release 1.0.0 in September. The release will include some backward incompatible changes.

Planned changes

  • Handle error response from amqp_client and returns {:error, atom} instead of raising MatchError in such cases (#68).
  • Related to the above, we will review the rest of APIs and fix some inconsistencies if needed.

TODOs

  • Release 0.2.3 with typespec improvements
  • Create 0.2 branch for the stable release
  • Tweak README on master branch with the information
  • Merge #68 and release 1.0.0-pre.1
  • Review the code base again and make a PR for other inconsistent API tweaks
  • Update the document with a clear instruction about changes and upgrading
  • Release 1.0.0-pre.2
  • Wait at least one week after the final release candidate
  • Release 1.0.0

Upgrade guide

https://github.com/pma/amqp/wiki/Upgrade-from-0.X-to-1.0

Can't publish a message into the exlusive queue

Hello,

I'm writing tests for my own small package, but stucked with an issues, when Elixir can't resolve the appropriate function:

18:01:33.475 [error] Process #PID<0.256.0> raised an exception
** (FunctionClauseError) no function clause matching in AMQP.Basic.publish/5
    (amqp) lib/amqp/basic.ex:51: AMQP.Basic.publish([connection: %AMQP.Connection{pid: #PID<0.238.0>}, config: [connection_timeout: 10000, username: "user", password: "password", host: "localhost", port: 5672, virtual_host: "/"]], "test.direct", "amq.gen-8CS8a9aGGBJzpCpt9LY2lw", "OK", [persistent: true])
    test/worker_test.exs:111: SpotterWorkerTest.CustomWorker.consume/5
18:01:33.475 [error] Error in process <0.256.0> with exit value:
{function_clause,[{'Elixir.AMQP.Basic',publish,[[{connection,#{'__struct__' => 'Elixir.AMQP.Connection',pid => <0.238.0>}},{config,[{connection_timeout,10000},{username,<<"user">>},{password,<<"password">>},{host,<<"localhost">>},{port,5672},{virtual_host,<<"/">>}]}],<<"test.direct">>,<<"amq.gen-8CS8a9aGGBJzpCpt9LY2lw">>,<<"OK">>,[{persistent,true}]],[{file,"lib/amqp/basic.ex"},{line,51}]},{'Elixir.SpotterWorkerTest.CustomWorker',consume,5,[{file,"test/worker_test.exs"},{line,111}]}]}


  1) test CustomWorker forwards message to the next queue (SpotterWorkerTest)
     test/worker_test.exs:135
     ** (MatchError) no match of right hand side value: {:empty, %{cluster_id: ""}}
     code: {:ok, payload, _} = AMQP.Basic.get(channel, queue[:queue])
     stacktrace:
       test/worker_test.exs:145: (test)

So, the code that I'm using for testing purposes is pretty straitforward:

defmodule MyTest do
  use ExUnit.Case
  use AMQP

  @generic_exchange "test.direct"
  @generic_queue_request "worker_test_queue_request"
  @generic_queue_forward "worker_test_queue_forward"

  @custom_amqp_opts [
    username: "user",
    password: "password",
    host: "localhost",
    port: 5672,
    virtual_host: "/"
  ]

  defmodule CustomWorker do
    use Spotter.Worker

    @exchange "test.direct"
    @queue_request "worker_test_queue_request"
    @queue_forward "worker_test_queue_forward"

    def configure(connection, _config) do
      {:ok, channel} = AMQP.Channel.open(connection)
      :ok = AMQP.Exchange.direct(channel, @exchange, durable: true)

      # An initial point where the worker do required stuff
      {:ok, _} = AMQP.Queue.declare(channel, @queue_request, durable: true)
      :ok = AMQP.Queue.bind(channel, @queue_request, @exchange, routing_key: @queue_request)

      # Queue for a valid messages
      {:ok, _} = AMQP.Queue.declare(channel, @queue_forward, durable: true)
      :ok = AMQP.Queue.bind(channel, @queue_forward, @exchange, routing_key: @queue_forward)

      :ok = AMQP.Basic.qos(channel, prefetch_count: 1)
      {:ok, _} = AMQP.Basic.consume(channel, @queue_request)

      {:ok, :done}
    end

    # Handle the trapped exit call
    def handle_info({:EXIT, _from, reason}, state) do
      {:stop, reason, state}
    end

    # Confirmation sent by the broker after registering this process as a consumer
    def handle_info({:basic_consume_ok, %{consumer_tag: _consumer_tag}}, channel) do
      {:noreply, channel}
    end

    # Sent by the broker when the consumer is unexpectedly cancelled
    def handle_info({:basic_cancel, %{consumer_tag: _consumer_tag}}, channel) do
      {:stop, :normal, channel}
    end

    # Confirmation sent by the broker to the consumer process after a Basic.cancel
    def handle_info({:basic_cancel_ok, %{consumer_tag: _consumer_tag}}, channel) do
      {:noreply, channel}
    end

    # Invoked when a message successfully consumed
    def handle_info({:basic_deliver, payload, %{delivery_tag: tag, reply_to: reply_to, headers: headers}}, channel) do
      spawn fn -> consume(channel, tag, reply_to, headers, payload) end
      {:noreply, channel}
    end

    # Processing a message
    defp consume(channel, tag, reply_to, headers, payload) do
      # an issue in the following line 
      :ok = AMQP.Basic.publish(channel, @exchange, reply_to, "OK", [persistent: true]) 
      AMQP.Basic.ack(channel, tag)
    end
  end

  setup_all do
    {:ok, pid} = CustomWorker.start_link(@custom_amqp_opts)
    {:ok, [worker: pid]}
  end

  def create_client_connection() do
    AMQP.Connection.open(@custom_amqp_opts)
  end

  def create_response_queue(connection) do
    {:ok, channel} = AMQP.Channel.open(connection)
    :ok = AMQP.Exchange.direct(channel, @generic_exchange, passive: true)

    {:ok, queue} = AMQP.Queue.declare(channel, "", exclusive: true, durable: true, auto_delete: false)
    :ok = AMQP.Queue.bind(channel, queue[:queue], @generic_exchange, routing_key: queue[:queue])

    {:ok, channel, queue}
  end

  test "CustomWorker forwards message to the next queue", _state do
    {:ok, connection} = create_client_connection()
    {:ok, channel, queue} = create_response_queue(connection)

    :ok = AMQP.Basic.publish(channel, @generic_exchange, @generic_queue_request, "VALID_DATA",
                             persistent: true,
                             reply_to: queue[:queue],
                             headers: [{:path, "api.matchmaking.search"},
                                       {:permissions, "get;post"}, ]
    )
    {:ok, payload, _} = AMQP.Basic.get(channel, queue[:queue])
    assert payload == "OK"

    AMQP.Queue.delete(channel, queue)
    AMQP.Connection.close(connection)
  end
end

Any ideas why it happens and how can I fix the following issue? I'd looked in the basic.ex file for publish args and implemetation, but I'm presumming that everything is fine.

expiration option not working

Hello,

when I use the expiration option, the message is not sent to my queue. I checked if it was not instantly expired but it is not. When I comment the expiration option it is working. When I create a message in the rabbitmq web interface, it works as expected.

def create_queue(req, chan) do
    {:ok, format} = Timex.format(req.launch_at, "%Y%m%d", :strftime)
    queue = "requests_#{format}"
    :ok = Exchange.direct(chan, @exchange, durable: true)
    {:ok, _} = Queue.declare(chan, @ready_queue, durable: true) # Declare the dead letter queue target
    {:ok, _} = Queue.declare(chan, queue, durable: true)
    :ok = Queue.bind(chan, @ready_queue, @exchange, routing_key: "ready")
    {:ok,  queue }
end

def publish(req) do
    {:ok, conn} = Connection.open(@conn_str)
    {:ok, chan} = Channel.open(conn)
    {:ok, queue} = create_queue(req, chan)

    :ok = Basic.publish(
      chan,
      "",
      queue,
      req.id,
      persistent: true,
      mandatory: true,
      headers: [
        {"x-dead-letter-exchange", :longstr, @exchange},
        {"x-dead-letter-routing-key", :longstr, "ready"}
      ],
      expiration: 10000
    )

    :ok = Connection.close(conn)
  end

screen shot 2016-10-25 at 01 00 39

Am I missing something ?

Best regards

`rabbit_common` dependency doesn't compile on newer rebar versions

On newer versions of rebar (tested with 2.5.1) the dependency rabbit_common doesn't compile.

@mobileoverlord has found a rabbit_common version from @d0rc that seems to be fixing this.

See here for the error: https://travis-ci.org/mschae/rebar_problem/builds/53253499#L109-L116
And here for a fixed version: https://travis-ci.org/mschae/rebar_problem/builds/56574002

The trick is to use {:amqp_client, github: "d0rc/amqp_client", override: true} as the amqp_client version: https://github.com/mschae/rebar_problem/compare/alternative_rebar_common?expand=0

Failed Publish Returns `:ok` but the channel is killed

This might be intended behavior, but I'm trying to understand how I should handle this in my application code.

To Reproduce

{:ok, conn} = AMQP.Connection.open("amqp://guest:guest@localhost")
{:ok, chann} = AMQP.Channel.open(conn)
AMQP.Confirm.select(chann) # => :ok
AMQP.Basic.publish chann, "non-existent-exchange", "routing.key", "ohai" # => returns :ok
AMQP.Confirm.wait_for_confirms(chann)

The call to wait_for_confirms raises an error like:

** (exit) exited in: :gen_server.call(#PID<0.1275.0>, {:wait_for_confirms, :infinity}, :infinity)
    ** (EXIT) no process
         (stdlib) gen_server.erl:212: :gen_server.call/3
    (amqp_client) src/amqp_channel.erl:239: :amqp_channel.wait_for_confirms/2

The Request

Ideally if I'm specifically waiting for publisher confims it would be awesome if I could get back an error message for my AMQP.Confirm.wait_for_confirms call. Something I can pattern match on, rather than trying to catch an error.

I'm happy to help out with development on this if it is a use-case you want to handle in this library.

Supervising multiple consumers

Hi,

Our application has many consumers - one for each queue. We don't want to reopen a connection in each one (as per the example in the README) and think it'd be better to create a connection in one GenServer, then pass that in to many consumer GenServers and have them each open their own channel.

At the same time, we want to recover from failures both on the channel and the connection. Since we can't pass a conn around a supervision tree we'll have to do this manually, but in #31 you suggest not using Supervision at all because of the speedy connection retry. I think a channel failure should only require the consumer to be restarted, and a connection failure results in a new connection and a whole new set of consumers. Is there a good pattern for this problem?

no process: the process is not alive or there's no process currently - TERMINATION!

Hi,

I am trying to use the amqp library more or less based on the consumer example from Readme.md.

The problem is, that I need to use prefetch, which works, but I would like to use reject with requeue option together. Then there some weird stuff happens. I get an error like this and it looks like every is terminated.

Am I doing something in a way that I shouldn't? Or is this a bug?

So basically I start the consumer, then I get 2 messages. When I ack one of them then next one is poped up, BUT randomly process is killed out of unknow reason? Same goes with rej.

** (EXIT from #PID<0.378.0>) evaluator process exited with reason: exited in: :gen_server.call(#PID<0.398.0>, {:call, {:"basic.reject", 3, true}, :none, #PID<0.386.0>}, :infinity)
    ** (EXIT) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started

14:58:06.040 [error] GenServer Rabbitmq.Consumer terminating
** (stop) exited in: :gen_server.call(#PID<0.398.0>, {:call, {:"basic.reject", 3, true}, :none, #PID<0.386.0>}, :infinity)
    ** (EXIT) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started
    (stdlib) gen_server.erl:214: :gen_server.call/3
    (rabbitmq) lib/rabbitmq/consumer.ex:62: Rabbitmq.Consumer.handle_cast/2
    (stdlib) gen_server.erl:616: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:686: :gen_server.handle_msg/6
    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Last message: {:"$gen_cast", {:rej, 3}}
State: %AMQP.Channel{conn: %AMQP.Connection{pid: #PID<0.389.0>}, pid: #PID<0.398.0>}

and my code is like

defmodule Rabbitmq.Consumer do
    @moduledoc """
    Module for listening on the RabbitMQ server.

    ## Params
        - connection_string
        - exchange
        - queue
        - queue_error
    """
    use GenServer
    use AMQP
    require Logger

    @exchange    "test1"
    @queue       "queue1"
    @queue_error "#{@queue}_error"

    ########
    # APIS #
    ########
    @doc """
    Starts to listen to given RabbitMQ queue

    ## Examples

        iex> {:ok, pid} = Rabbitmq.Listener.start_link
        iex> Process.alive? pid
        true

    ## TODO: 
        - Maybe the supervisor could send the configuration for queue, connection string, 
          exchange, queue_error and name of the module true the start link?
        - Also the payload could be send true the start_link(imei |> DeviceDataProcessing.start?)
        - Examples in the start_link/1 should probably not exist!
    """
    def start_link do
      GenServer.start_link(__MODULE__, [], name: __MODULE__)
    end
  
    def ack(tag) do
        GenServer.cast(__MODULE__, {:ack, tag})
    end

    def rej(tag) do
        GenServer.cast(__MODULE__, {:rej, tag})
    end

    #############
    # CALLBACKS #
    #############
    def init(_opts) do
        rabbitmq_connect()
    end

    def handle_cast({:ack, tag}, channel) do
        Basic.ack channel, tag
        {:noreply, channel}
    end

    def handle_cast({:rej, tag}, channel) do
        Basic.reject channel, tag, requeue: true
        {:noreply, channel}
    end
  
    # Confirmation sent by the broker after registering this process as a consumer
    def handle_info({:basic_consume_ok, %{consumer_tag: _consumer_tag}}, chan) do
      {:noreply, chan}
    end
  
    # Sent by the broker when the consumer is unexpectedly cancelled (such as after a queue deletion)
    def handle_info({:basic_cancel, %{consumer_tag: _consumer_tag}}, chan) do
      {:stop, :normal, chan}
    end
  
    # Confirmation sent by the broker to the consumer process after a Basic.cancel
    def handle_info({:basic_cancel_ok, %{consumer_tag: _consumer_tag}}, chan) do
      {:noreply, chan}
    end
  
    def handle_info({:basic_deliver, payload, %{delivery_tag: tag, redelivered: redelivered}}, chan) do
      spawn fn -> consume(chan, tag, redelivered, payload) end
      {:noreply, chan}
    end

    # 2. Implement a callback to handle DOWN notifications from the system
    #    This callback should try to reconnect to the server
    def handle_info({:DOWN, _, :process, _pid, _reason}, _) do
        {:ok, chan} = rabbitmq_connect()
        {:noreply, chan}
    end
    
    #####################
    # PRIVATE FUNCTIONS #
    #####################
    defp consume(channel, tag, redelivered, payload) do
        

        # TODO: Add Logic worker assigned to process the data from RabbitMQ queue

        Logger.debug "#{inspect payload}"

        rescue
            # Requeue unless it's a redelivered message.
            # This means we will retry consuming a message once in case of exception
            # before we give up and have it moved to the error queue
            #
            # You might also want to catch :exit signal in production code.
            # Make sure you call ack, nack or reject otherwise comsumer will stop
            # receiving messages.
            exception ->
                Basic.reject channel, tag, requeue: not redelivered
                Logger.error "[Rabbitmq.Listener] Error with payload: #{inspect payload},\nwith error: #{inspect exception}"
    end

    defp rabbitmq_connect do
        Application.get_env(:rabbitmq, :connection_string)
            |> Connection.open 
            |> connection_status
    end

    defp connection_status({:ok, conn}) do
        Process.monitor(conn.pid)
        {:ok, chan} = Channel.open(conn)
        Basic.qos(chan, prefetch_count: 2)
        Queue.declare(chan, @queue_error, durable: true)
        Exchange.direct(chan, @exchange, durable: true)
        Queue.bind(chan, @queue, @exchange)
        {:ok, _consumer_tag} = Basic.consume(chan, @queue)
        Logger.debug "[Rabbitmq.Listener] Connected to RabbitMQ..."
        {:ok, chan}
    end

    defp connection_status({:error, reason}) do
        Logger.warn "[Rabbitmq.Listener] Something is wrong, can't connect #{inspect reason}, trying to reconnect..."
        :timer.sleep(10000)
        rabbitmq_connect()
    end
end

Consumer stop getting messages

having a weird issue and wondering if anyone has any insight. We are noticing our counsumer (based off the example in the README) stops getting messages from rabbitmq server after some period of time. The process is alive. The channel and connection processes are able to be queried (Process.info) So they are alive. A simple :kill of the consumer returns everything back to normal. There is nothing in the rabbitmq-server logs that i can see.

Thanks

Unable to reuse consumers

Hi guys,

I am trying to create a little abstraction for reducing the boilerplate to expose consumers,

But I am having a strange behavior when consuming messages,

Basically I have this:

  • lib/app.ex
defmodule App do
  use Application
  import Supervisor.Spec

  def start(_type, _args), do: Supervisor.start_link(children, opts)

  defp opts, do: [strategy: :one_for_one, name: App.Supervisor]

  defp children do
    [supervisor(App.Repo, []),
     supervisor(App.Endpoint, []),
     worker(App.OrderCreatedListener, [], id: :order_created_listener),
     worker(App.OrderUpdatedListener, [], id: :order_updated_listener)]
  end
  • order_created_listener.ex
defmodule App.OrderCreatedListener do
  @moduledoc false

  use App.EventBus.Listener,
    virtual_host: System.get_env("BROKER_VHOST"),
    queue_name: "order_queue",
    exchange: "event_bus",
    routing_key: "order.created",
    handler: App.OrderCreatedHandler

  def start_link(opts \\ %{}),
    do: GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
  • order_updated_listener.ex
defmodule App.OrderUpdatedListener do
  @moduledoc false

  use App.EventBus.Listener,
    virtual_host: System.get_env("BROKER_VHOST"),
    queue_name: "order_queue",
    exchange: "event_bus",
    routing_key: "order.updated",
    handler: App.OrderUpdatedHandler

  def start_link(opts \\ %{}),
    do: GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
  • event_bus/listener
defmodule App.EventBus.Listener do
  @moduledoc false

  defmacro __using__(options) do
    quote location: :keep do
      use GenServer
      use AMQP
      require Logger

      @opts unquote(options)

      def init(_) do
        queue_error = "#{@opts[:queue_name]}.errors"
        queue_name = @opts[:queue_name]
        exchange = @opts[:exchange]
        routing_key = @opts[:routing_key]

        {:ok, chan} = create_conn
        {:ok, _consumer_tag} =
        chan
        |> create_queue(queue_error)
        |> create_topic(exchange)
        |> create_queue(queue_name, dead_letter_opts(queue_error))
        |> bind_queue(queue_name, exchange, routing_key: routing_key)
        |> subscribe(queue_name)
        {:ok, [chan]}
      end

      def handle_info({:basic_deliver, payload, %{delivery_tag: tag,
          redelivered: redelivered}}, [chan] = state) do
        spawn fn ->
          try do
            payload
            |> Poison.decode!
            |> @opts[:handler].handle # When consuming a message, the handler reference is always the first one declared which in case is -> worker(App.OrderCreatedListener, [], id: :order_created_listener)
            AMQP.Basic.ack(chan, tag)
          rescue
            err ->
              Logger.error("Failed to handle message #{inspect(payload)} " <>
              "with error #{inspect(err)}")
              AMQP.Basic.reject(chan, tag, requeue: not redelivered)
          end
        end
        {:noreply, state}
      end

      def handle_info({:DOWN, _, :process, _pid, _reason}, [_]) do
        {:ok, chan} = create_conn
        {:noreply, [chan]}
      end

      def handle_info(_msg, state), do: {:noreply, state}

      defp create_conn,
        do: App.EventBus.ConnectionManager.create_conn

      defp create_queue(chan, queue_name, opts \\ []) do
        options = opts ++ [durable: true]
        AMQP.Queue.declare(chan, queue_name, options)
        chan
      end

      defp create_topic(chan, exchange, opts \\ []) do
        options = opts ++ [durable: true]
        AMQP.Exchange.topic(chan, exchange, options)
        chan
      end

      defp bind_queue(chan, queue_name, exchange, opts) do
        AMQP.Queue.bind(chan, queue_name, exchange, opts)
        chan
      end

      defp subscribe(chan, queue_name),
        do: AMQP.Basic.consume(chan, queue_name)

      defp dead_letter_opts(queue_error) do
        [arguments: [
            {"x-dead-letter-exchange", :longstr, ""},
            {"x-dead-letter-routing-key", :longstr, queue_error}]]
      end
    end
  end
end

We see that I declared two consumers on lib/app.ex: order_created_listener and order_updated_listener all good,

When I send a message to listener OrderCreatedListener with routing_key=order.created the @opts[:handler] reference is from the right consumer @opts[:handler] = OrderCreatedListener

But sending a message to the listener OrderUpdatedListener with routing_key=order.updated the @opts[:handler] reference is from the first consumer declared which in case is -> worker(App.OrderCreatedListener) so the handler points to @opts[:handler] = OrderCreatedListener and it should be OrderUpdatedListener

The init inside the macro correctly injects dependencies from the listeners/consumers that extends it

def init(_) do
  queue_error = "#{@opts[:queue_name]}.errors"
  queue_name = @opts[:queue_name]
  exchange = @opts[:exchange]
  routing_key = @opts[:routing_key]
  # I printed the values here and it is working

When a message arrives from rabbit it is always calling the first consumer declared in App#children function which is case is worker(App.OrderCreatedListener, [], id: :order_created_listener)

I've looked at the code and realized that it should call different consumers as it references the id

Any thoughts here mates?

TCP connection close after 60s

RabbitMQ 3.7.3
AMQP 1.0.0
Erlang 20.2.2
Elixir 1.6.0

Start a Rabbit MQ server locally, then use amqp client to connect to it.

Erlang/OTP 20 [erts-9.2] [source] [64-bit] [smp:4:4] [ds:4:4:10] [async-threads:10] [hipe] [kernel-poll:false] [dtrace]
Interactive Elixir (1.6.0) - press Ctrl+C to exit (type h() ENTER for help)

iex(1)> AMQP.Connection.open()
{:ok, %AMQP.Connection{pid: #PID<0.223.0>}}
iex(2)> # wait 60s
21:37:31.909 [error] GenServer #PID<0.227.0> terminating
** (stop) {:socket_error, :timeout}
Last message: {:inet_async, #Port<0.6798>, 5, {:error, :timeout}}
State: {:state, #Port<0.6798>, #PID<0.223.0>, #PID<0.225.0>, {:method, :rabbit_framing_amqp_0_9_1}, {:expecting_header, ""}}

21:37:31.925 [error] GenServer #PID<0.223.0> terminating
** (stop) {:socket_error, :timeout}
Last message: {:socket_error, :timeout}
State: {:state, :amqp_network_connection, {:state, #Port<0.6798>, "client 127.0.0.1:50526 -> 127.0.0.1:5672", 60, #PID<0.226.0>, 131072, #PID<0.222.0>, :undefined, false}, #PID<0.225.0>, {:amqp_params_network, "guest", "guest", "/", 'localhost', 5672, 0, 0, 0, :infinity, :none, [&:amqp_auth_mechanisms.plain/3, &:amqp_auth_mechanisms.amqplain/3], [], []}, 0, [{"capabilities", :table, [{"publisher_confirms", :bool, true}, {"exchange_exchange_bindings", :bool, true}, {"basic.nack", :bool, true}, {"consumer_cancel_notify", :bool, true}, {"connection.blocked", :bool, true}, {"consumer_priorities", :bool, true}, {"authentication_failure_close", :bool, true}, {"per_consumer_qos", :bool, true}, {"direct_reply_to", :bool, true}]}, {"cluster_name", :longstr, "rabbit@Boshans-MacBook-Pro"}, {"copyright", :longstr, "Copyright (C) 2007-2018 Pivotal Software, Inc."}, {"information", :longstr, "Licensed under the MPL.  See http://www.rabbitmq.com/"}, {"platform", :longstr, "Erlang/OTP 20.2.2"}, {"product", :longstr, "RabbitMQ"}, {"version", :longstr, "3.7.3"}], :none, false}

21:37:31.909 [error] gen_server <0.227.0> terminated with reason: {socket_error,timeout}
21:37:31.909 [error] CRASH REPORT Process <0.227.0> with 0 neighbours exited with reason: {socket_error,timeout} in gen_server:handle_common_reply/8 line 726
21:37:31.925 [error] gen_server <0.223.0> terminated with reason: {socket_error,timeout}
21:37:31.925 [error] Supervisor {<0.222.0>,amqp_connection_type_sup} had child main_reader started with amqp_main_reader:start_link(#Port<0.6798>, <0.223.0>, <0.225.0>, {method,rabbit_framing_amqp_0_9_1}, <<"client 127.0.0.1:50526 -> 127.0.0.1:5672">>) at <0.227.0> exit with reason {socket_error,timeout} in context child_terminated
21:37:31.925 [error] Supervisor {<0.222.0>,amqp_connection_type_sup} had child main_reader started with amqp_main_reader:start_link(#Port<0.6798>, <0.223.0>, <0.225.0>, {method,rabbit_framing_amqp_0_9_1}, <<"client 127.0.0.1:50526 -> 127.0.0.1:5672">>) at <0.227.0> exit with reason reached_max_restart_intensity in context shutdown
21:37:31.926 [error] CRASH REPORT Process <0.223.0> with 0 neighbours exited with reason: {socket_error,timeout} in gen_server:handle_common_reply/8 line 726
21:37:31.926 [error] Supervisor {<0.221.0>,amqp_connection_sup} had child connection started with amqp_gen_connection:start_link(<0.222.0>, {amqp_params_network,<<"guest">>,<<"guest">>,<<"/">>,"localhost",5672,0,0,0,infinity,none,[#Fun<amq..>,...],...}) at <0.223.0> exit with reason {socket_error,timeout} in context child_terminated
21:37:31.926 [error] Supervisor {<0.221.0>,amqp_connection_sup} had child connection started with amqp_gen_connection:start_link(<0.222.0>, {amqp_params_network,<<"guest">>,<<"guest">>,<<"/">>,"localhost",5672,0,0,0,infinity,none,[#Fun<amq..>,...],...}) at <0.223.0> exit with reason reached_max_restart_intensity in context shutdown

Using wireshark for debug, turns out after TCP connection established from amqp client to RabbitMQ server, 60s later client send tcp FIN to server, then the connection was closed.

Note that during those 60s, the connection is established successfully and I'm able to create a channel and publish message to an exchange.

Is this a bug in amqp client, or is there anything I did wrong?

Thanks in advance!

Feature: Typespecs

It would be extremely helpful to have Elixir typespecs for the core Erlang Records. This would help with creating a generic AMQP GenServer Behaviour.

Multiple issues with dependencies during install of :amqp

To reproduce - generate fresh project:

mix new test
cd test/

Add this as specified in readme:

  def application do
    [applications: [:logger, :amqp]]
  end
...
  defp deps do
    [{:amqp, "0.2.0-pre.1"}]
  end

Then try to install:

$ mix deps.get
Could not find Hex, which is needed to build dependency :amqp
Shall I install Hex? (if running non-interactively, use: "mix local.hex --force") [Yn] Y

12:27:50.838 [error] Failed updating session:
   ProfileName: :httpc_mix
   SessionId:   {{'repo.hex.pm', 443}, #PID<0.119.0>}
   Pos:         9
   Value:       true
when
   Session (db) info: :undefined
   Session (db):      {:session, {{'repo.hex.pm', 443}, #PID<0.119.0>}, false, :https,
 {:sslsocket, {:gen_tcp, #Port<0.4907>, :tls_connection, :undefined},
  #PID<0.120.0>}, {:essl, []}, 1, :keep_alive, false}
   Session (record):  {:EXIT,
 {:badarg,
  [{:ets, :lookup,
    [:httpc_mix__session_db, {{'repo.hex.pm', 443}, #PID<0.119.0>}], []},
   {:httpc_manager, :lookup_session, 2, [file: 'httpc_manager.erl', line: 189]},
   {:httpc_handler, :update_session, 4,
    [file: 'httpc_handler.erl', line: 1909]},
   {:httpc_handler, :maybe_make_session_available, 2,
    [file: 'httpc_handler.erl', line: 1516]},
   {:httpc_handler, :answer_request, 3,
    [file: 'httpc_handler.erl', line: 1507]},
   {:httpc_handler, :handle_response, 1,
    [file: 'httpc_handler.erl', line: 1251]},
   {:httpc_handler, :handle_info, 2, [file: 'httpc_handler.erl', line: 471]},
   {:gen_server, :try_dispatch, 4, [file: 'gen_server.erl', line: 601]}]}}
   T: :error
   E: :badarg

12:27:50.855 [error] Failed updating session:
   ProfileName: :httpc_mix
   SessionId:   {{'repo.hex.pm', 443}, #PID<0.119.0>}
   Pos:         9
   Value:       true
when
   Session (db) info: :undefined
   Session (db):      {:session, {{'repo.hex.pm', 443}, #PID<0.119.0>}, false, :https,
 {:sslsocket, {:gen_tcp, #Port<0.4907>, :tls_connection, :undefined},
  #PID<0.120.0>}, {:essl, []}, 1, :keep_alive, false}
   Session (record):  {:EXIT,
 {:badarg,
  [{:ets, :lookup,
    [:httpc_mix__session_db, {{'repo.hex.pm', 443}, #PID<0.119.0>}], []},
   {:httpc_manager, :lookup_session, 2, [file: 'httpc_manager.erl', line: 189]},
   {:httpc_handler, :update_session, 4,
    [file: 'httpc_handler.erl', line: 1909]},
   {:httpc_handler, :maybe_make_session_available, 2,
    [file: 'httpc_handler.erl', line: 1516]},
   {:httpc_handler, :answer_request, 3,
    [file: 'httpc_handler.erl', line: 1507]},
   {:httpc_handler, :terminate, 2, [file: 'httpc_handler.erl', line: 759]},
   {:gen_server, :try_terminate, 3, [file: 'gen_server.erl', line: 629]},
   {:gen_server, :terminate, 7, [file: 'gen_server.erl', line: 795]}]}}
   T: :error
   E: :badarg

12:27:50.880 [error] GenServer #PID<0.119.0> terminating
** (stop) {:failed_updating_session, [profile: :httpc_mix, session_id: {{'repo.hex.pm', 443}, #PID<0.119.0>}, pos: 9, value: true, etype: :error, error: :badarg, stacktrace: [{:ets, :update_element, [:httpc_mix__session_db, {{'repo.hex.pm', 443}, #PID<0.119.0>}, {9, true}], []}, {:httpc_manager, :update_session, 4, [file: 'httpc_manager.erl', line: 210]}, {:httpc_handler, :update_session, 4, [file: 'httpc_handler.erl', line: 1887]}, {:httpc_handler, :maybe_make_session_available, 2, [file: 'httpc_handler.erl', line: 1516]}, {:httpc_handler, :answer_request, 3, [file: 'httpc_handler.erl', line: 1507]}, {:httpc_handler, :terminate, 2, [file: 'httpc_handler.erl', line: 759]}, {:gen_server, :try_terminate, 3, [file: 'gen_server.erl', line: 629]}, {:gen_server, :terminate, 7, [file: 'gen_server.erl', line: 795]}]]}
    (inets) httpc_handler.erl:1911: :httpc_handler.update_session/4
    (inets) httpc_handler.erl:1516: :httpc_handler.maybe_make_session_available/2
    (inets) httpc_handler.erl:1507: :httpc_handler.answer_request/3
    (inets) httpc_handler.erl:759: :httpc_handler.terminate/2
    (stdlib) gen_server.erl:629: :gen_server.try_terminate/3
    (stdlib) gen_server.erl:795: :gen_server.terminate/7
    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Last message: {:ssl, {:sslsocket, {:gen_tcp, #Port<0.4907>, :tls_connection, :undefined}, #PID<0.120.0>}, <<0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 110, 1, 2, 0, 104, 101, 120, 45, 48, 46, 49, 53, 46, 48, 47, 101, 98, 105, 110, 47, 69, 108, 105, 120, 105, 114, 46, 77, 105, 120, 46, 84, 97, 115, 107, ...>>}
State: {:state, {:request, #Reference<0.0.2.133>, #PID<0.71.0>, 0, :https, {'repo.hex.pm', 443}, '/installs/1.3.0/hex-0.15.0.ez', [], :get, {:http_request_h, :undefined, 'keep-alive', :undefined, :undefined, :undefined, :undefined, :undefined, :undefined, :undefined, :undefined, :undefined, :undefined, :undefined, :undefined, :undefined, :undefined, 'repo.hex.pm', :undefined, :undefined, :undefined, :undefined, :undefined, :undefined, :undefined, :undefined, :undefined, [], 'Mix/1.3.1', :undefined, :undefined, :undefined, '0', :undefined, :undefined, :undefined, :undefined, :undefined, ...}, {[], []}, {:http_options, 'HTTP/1.1', :infinity, true, {:essl, []}, :undefined, true, :infinity, false}, 'https://repo.hex.pm/installs/1.3.0/hex-0.15.0.ez', [], :none, [], 1487680070263, :undefined, :undefined, false}, {:session, {{'repo.hex.pm', 443}, #PID<0.119.0>}, false, :https, {:sslsocket, {:gen_tcp, #Port<0.4907>, :tls_connection, :undefined}, #PID<0.120.0>}, {:essl, []}, 1, :keep_alive, false}, {'HTTP/1.1', 200, 'OK'}, {:http_response_h, 'public, max-age=604800', 'keep-alive', 'Tue, 21 Feb 2017 12:27:50 GMT', :undefined, :undefined, :undefined, :undefined, '1.1 varnish', :undefined, 'bytes', '354703', '"32639417cd22865fe37dd2eba7597f8a"', :undefined, :undefined, :undefined, 'AmazonS3', :undefined, :undefined, :undefined, :undefined, :undefined, '478371', :undefined, :undefined, :undefined, 'application/andrew-inset', :undefined, 'Fri, 23 Dec 2016 23:29:31 GMT', [{'x-amz-id-2', 'YcuzbLY69wJibQhxHS7Ab+vWP+pTax+yGM1tOYEYOuEp/rOmeUrWjnMU+NtwdILC9M/qM+uCaDQ='}, {'x-amz-request-id', '55D20D92FB98DA4B'}, {'x-amz-replication-status', 'FAILED'}, {'x-amz-meta-surrogate-key', 'installs'}, {'x-amz-version-id', 'OClpliznSKNcQv3EnXaKQigPoUrT5Ftm'}, {'fastly-debug-digest', '5db161298266899fd3e93f292ed51788ae6e376328c4d76c0190675b35234c98'}, {'x-served-by', 'cache-iad2130-IAD, cache-hhn1534-HHN'}, {'x-cache', 'HIT, HIT'}, {'x-cache-hits', '1, 1'}, {'x-timer', 'S1487680070.414301,VS0,VE0'}]}, :undefined, {:httpc_response, :whole_body, [<<80, 75, 3, 4, 20, 0, 0, 0, 0, 0, 137, 3, 152, 73, 173, 155, 214, 230, 32, 108, 0, 0, 32, 108, 0, 0, 34, 0, 0, 0, 104, 101, 120, 45, 48, 46, 49, 53, 46, 48, ...>>, 478371]}, {[], []}, {[], []}, :new, [], :nolimit, :nolimit, {:options, {:undefined, []}, {:undefined, []}, 0, 2, 5, 120000, 2, :disabled, false, :inet, :default, :default, []}, {:timers, [], :undefined}, :httpc_mix, :inactive}
* creating /home/vagrant/.mix/archives/hex-0.15.0
Running dependency resolution...
Dependency resolution completed:
  amqp 0.2.0-pre.1
  amqp_client 3.6.7-pre.1
  rabbit_common 3.6.7-pre.1
* Getting amqp (Hex package)
  Checking package (https://repo.hex.pm/tarballs/amqp-0.2.0-pre.1.tar)
  Fetched package
* Getting amqp_client (Hex package)
  Checking package (https://repo.hex.pm/tarballs/amqp_client-3.6.7-pre.1.tar)
  Fetched package
$ iex -S mix
Erlang/OTP 19 [erts-8.0.2] [source-753b9b9] [64-bit] [smp:2:2] [async-threads:10] [hipe] [kernel-poll:false]

Unchecked dependencies for environment dev:
* rabbit_common (Hex package)
  the dependency is not available, run "mix deps.get"
** (Mix) Can't continue due to errors on dependencies

Running mix. deps.get goes just fine 2nd time:

mix deps.get
Running dependency resolution...
* Getting rabbit_common (Hex package)
  Checking package (https://repo.hex.pm/tarballs/rabbit_common-3.6.7-pre.1.tar)
  Using locally cached package

But then problem of same nature happens with second dependency (rebar3):

$ iex -S mix
Erlang/OTP 19 [erts-8.0.2] [source-753b9b9] [64-bit] [smp:2:2] [async-threads:10] [hipe] [kernel-poll:false]

Could not find "rebar3", which is needed to build dependency :rabbit_common
I can install a local copy which is just used by Mix
Shall I install rebar3? (if running non-interactively, use: "mix local.rebar --force") [Yn] Y
* creating /home/vagrant/.mix/rebar

12:35:42.966 [error] Failed updating session:
   ProfileName: :httpc_mix
   SessionId:   {{'repo.hex.pm', 443}, #PID<0.138.0>}
   Pos:         9
   Value:       true
when
   Session (db) info: :undefined
   Session (db):      {:session, {{'repo.hex.pm', 443}, #PID<0.138.0>}, false, :https,
 {:sslsocket, {:gen_tcp, #Port<0.5586>, :tls_connection, :undefined},
  #PID<0.139.0>}, {:essl, []}, 1, :keep_alive, false}
   Session (record):  {:EXIT,
 {:badarg,
  [{:ets, :lookup,
    [:httpc_mix__session_db, {{'repo.hex.pm', 443}, #PID<0.138.0>}], []},
   {:httpc_manager, :lookup_session, 2, [file: 'httpc_manager.erl', line: 189]},
   {:httpc_handler, :update_session, 4,
    [file: 'httpc_handler.erl', line: 1909]},
   {:httpc_handler, :maybe_make_session_available, 2,
    [file: 'httpc_handler.erl', line: 1516]},
   {:httpc_handler, :answer_request, 3,
    [file: 'httpc_handler.erl', line: 1507]},
   {:httpc_handler, :handle_response, 1,
    [file: 'httpc_handler.erl', line: 1251]},
   {:httpc_handler, :handle_info, 2, [file: 'httpc_handler.erl', line: 471]},
   {:gen_server, :try_dispatch, 4, [file: 'gen_server.erl', line: 601]}]}}
   T: :error
   E: :badarg

12:35:42.967 [error] Failed updating session:
   ProfileName: :httpc_mix
   SessionId:   {{'repo.hex.pm', 443}, #PID<0.138.0>}
   Pos:         9
   Value:       true
when
   Session (db) info: :undefined
   Session (db):      {:session, {{'repo.hex.pm', 443}, #PID<0.138.0>}, false, :https,
 {:sslsocket, {:gen_tcp, #Port<0.5586>, :tls_connection, :undefined},
  #PID<0.139.0>}, {:essl, []}, 1, :keep_alive, false}
   Session (record):  {:EXIT,
 {:badarg,
  [{:ets, :lookup,
    [:httpc_mix__session_db, {{'repo.hex.pm', 443}, #PID<0.138.0>}], []},
   {:httpc_manager, :lookup_session, 2, [file: 'httpc_manager.erl', line: 189]},
   {:httpc_handler, :update_session, 4,
    [file: 'httpc_handler.erl', line: 1909]},
   {:httpc_handler, :maybe_make_session_available, 2,
    [file: 'httpc_handler.erl', line: 1516]},
   {:httpc_handler, :answer_request, 3,
    [file: 'httpc_handler.erl', line: 1507]},
   {:httpc_handler, :terminate, 2, [file: 'httpc_handler.erl', line: 759]},
   {:gen_server, :try_terminate, 3, [file: 'gen_server.erl', line: 629]},
   {:gen_server, :terminate, 7, [file: 'gen_server.erl', line: 795]}]}}
   T: :error
   E: :badarg

12:35:42.987 [error] GenServer #PID<0.138.0> terminating
** (stop) {:failed_updating_session, [profile: :httpc_mix, session_id: {{'repo.hex.pm', 443}, #PID<0.138.0>}, pos: 9, value: true, etype: :error, error: :badarg, stacktrace: [{:ets, :update_element, [:httpc_mix__session_db, {{'repo.hex.pm', 443}, #PID<0.138.0>}, {9, true}], []}, {:httpc_manager, :update_session, 4, [file: 'httpc_manager.erl', line: 210]}, {:httpc_handler, :update_session, 4, [file: 'httpc_handler.erl', line: 1887]}, {:httpc_handler, :maybe_make_session_available, 2, [file: 'httpc_handler.erl', line: 1516]}, {:httpc_handler, :answer_request, 3, [file: 'httpc_handler.erl', line: 1507]}, {:httpc_handler, :terminate, 2, [file: 'httpc_handler.erl', line: 759]}, {:gen_server, :try_terminate, 3, [file: 'gen_server.erl', line: 629]}, {:gen_server, :terminate, 7, [file: 'gen_server.erl', line: 795]}]]}
    (inets) httpc_handler.erl:1911: :httpc_handler.update_session/4
    (inets) httpc_handler.erl:1516: :httpc_handler.maybe_make_session_available/2
    (inets) httpc_handler.erl:1507: :httpc_handler.answer_request/3
    (inets) httpc_handler.erl:759: :httpc_handler.terminate/2
    (stdlib) gen_server.erl:629: :gen_server.try_terminate/3
    (stdlib) gen_server.erl:795: :gen_server.terminate/7
    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Last message: {:ssl, {:sslsocket, {:gen_tcp, #Port<0.5586>, :tls_connection, :undefined}, #PID<0.139.0>}, <<179, 208, 236, 238, 148, 180, 83, 8, 33, 252, 119, 167, 120, 208, 244, 210, 118, 222, 247, 121, 59, 211, 52, 133, 55, 213, 34, 216, 10, 120, 133, 64, 182, 196, 36, 163, 120, 179, 223, 199, 131, 41, 15, 135, 135, 222, 229, 109, ...>>}
State: {:state, {:request, #Reference<0.0.1.182>, #PID<0.80.0>, 0, :https, {'repo.hex.pm', 443}, '/installs/1.0.0/rebar-2.6.2', [], :get, {:http_request_h, :undefined, 'keep-alive', :undefined, :undefined, :undefined, :undefined, :undefined, :undefined, :undefined, :undefined, :undefined, :undefined, :undefined, :undefined, :undefined, :undefined, 'repo.hex.pm', :undefined, :undefined, :undefined, :undefined, :undefined, :undefined, :undefined, :undefined, :undefined, [], 'Mix/1.3.1', :undefined, :undefined, :undefined, '0', :undefined, :undefined, :undefined, :undefined, :undefined, ...}, {[], []}, {:http_options, 'HTTP/1.1', :infinity, true, {:essl, []}, :undefined, true, :infinity, false}, 'https://repo.hex.pm/installs/1.0.0/rebar-2.6.2', [], :none, [], 1487680542620, :undefined, :undefined, false}, {:session, {{'repo.hex.pm', 443}, #PID<0.138.0>}, false, :https, {:sslsocket, {:gen_tcp, #Port<0.5586>, :tls_connection, :undefined}, #PID<0.139.0>}, {:essl, []}, 1, :keep_alive, false}, {'HTTP/1.1', 200, 'OK'}, {:http_response_h, 'public, max-age=604800', 'keep-alive', 'Tue, 21 Feb 2017 12:35:42 GMT', :undefined, :undefined, :undefined, :undefined, '1.1 varnish', :undefined, 'bytes', '368449', '"9b5bfb52e8b106dc5ef1bb1e21cf7485"', :undefined, :undefined, :undefined, 'AmazonS3', :undefined, :undefined, :undefined, :undefined, :undefined, '204449', :undefined, :undefined, :undefined, 'binary/octet-stream', :undefined, 'Tue, 28 Jun 2016 23:59:10 GMT', [{'x-amz-id-2', 'rMDvTzlux7WLEr8DqnkEUqM8Y3dMcP0CxVJQ4AAiWZnepvpoD2VfetTbx7K+F3pN/uYEnQuTEQM='}, {'x-amz-request-id', '737D5A3C20A8DA41'}, {'x-amz-replication-status', 'COMPLETED'}, {'x-amz-meta-surrogate-key', 'installs'}, {'x-amz-version-id', '0cgtvdRF75tyFYn.QPwoiys8_8hU2tMn'}, {'fastly-debug-digest', 'd858d9a6a6c39f818898cfe6e8ecb325cc9c225033a5896c7be5e5df1ba34d4d'}, {'x-served-by', 'cache-iad2131-IAD, cache-hhn1528-HHN'}, {'x-cache', 'HIT, HIT'}, {'x-cache-hits', '1, 1'}, {'x-timer', 'S1487680542.770337,VS0,VE0'}]}, :undefined, {:httpc_response, :whole_body, [<<35, 33, 47, 117, 115, 114, 47, 98, 105, 110, 47, 101, 110, 118, 32, 101, 115, 99, 114, 105, 112, 116, 10, 37, 37, 10, 37, 37, 33, 32, 45, 112, 97, 32, 114, 101, 98, 97, 114, 47, ...>>, 204449]}, {[], []}, {[], []}, :new, [], :nolimit, :nolimit, {:options, {:undefined, []}, {:undefined, []}, 0, 2, 5, 120000, 2, :disabled, false, :inet, :default, :default, []}, {:timers, [], :undefined}, :httpc_mix, :inactive}
* creating /home/vagrant/.mix/rebar3
===> Compiling rabbit_common
===> Compiling amqp_client
==> amqp
Compiling 9 files (.ex)
Generated amqp app
==> test
Compiling 1 file (.ex)
Generated test app
Interactive Elixir (1.3.1) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> {:ok, conn} = AMQP.Connection.open

Exchange.declare doesn't allow to declare custom type

We use the random-exchange plugin. When I try to declare a exchange with the custom type "x-random"

Exchange.declare(chan, exchange, :"x-random", durable: false)

I get following error:

** (FunctionClauseError) no function clause matching in AMQP.Exchange.declare/4
    (amqp) lib/amqp/exchange.ex:32: AMQP.Exchange.declare(%AMQP.Channel{conn: %AMQP.Connection{pid: #PID<0.286.0>}, pid: #PID<0.297.0>}, "myservice.global", :"x-random", [durable: false])

Would it make sense to remove the type restriction in exchange.ex to make the library more flexible to use ?

Durable / Stable Connections

Hi,

I want to use the library to have a constant connection to the rabbitmq even if the server has a failure/drops. In that situation, I'd expect the client to have some kind of reconnect logic.

I am following the GenServer pattern offered in the README file with a supervisor, but if I drop the RabbitMQ server when the GenServer is running I get a :socket_closed_unexpectedly message and the client never tries to reconnect, even when the RabbitMQ goes up again. The supervisor does not try to start another client either (is it because we're returning the chan pid back to the supervision tree?)

Any suggestions to have a durable and stable connection to a rabbitmq server? Am I using the supervision tree / genserver pattern the wrong way?

Thanks for the help.

AMQP.Queue.subscribe

Hello, if I make the function I pass raise a exception, there's a reraise exception, stacktrace in do_consume that terminates the process and it stops processing messages. I'd like it not to reraise is there a good reason for the reraise?

Connection.open should have options and a uri

The Connection.open function should parse a uri as well as take a Keyword map. They shouldn't be mutually exclusive.

Example:

AMQP.Connection.open("amqp://guest:guest@localhost", ssl_options: [...])

Documentation on Numeric Arguements to Queue.declare

Need documentation on how to pass numeric arguments to Queue.declare, for example:

Queue.declare(chan, @Queue, arguments: [{"x-message-ttl", :number, 5000}])

I've tried :number, :num, :numeric, as well as tried passing as a string as in the Readme but no success.

Could this be documented in the readme?

Upgrading amqp to work with latest 3.7.1 amqp_client and rabbit_common

@tag :skip_ci
  # This test assumes that we have a running RabbitMQ with short
  # nodename 'rabbit@localhost', and that it uses the same Erlang
  # cookie as we do. And it's better to have RabbitMQ of version
  # equal to that of amqp_client library used. We can achieve this
  # with following sequence of commands under the same user that
  # will run the test-suite:
  #
  # wget https://github.com/rabbitmq/rabbitmq-server/releases/download/rabbitmq_v3_6_9/rabbitmq-server-generic-unix-3.6.9.tar.xz
  # tar xJvf rabbitmq-server-generic-unix-3.6.9.tar.xz
  # cd rabbitmq_server-3.6.9
  # RABBITMQ_NODENAME=rabbit@localhost ./sbin/rabbitmq-server
  test "open direct connection" do
    :ok = ensure_distribution()
    assert {:ok, conn} = Connection.open_direct node: :rabbit@localhost
    assert :ok = Connection.close(conn)
  end

Why does this test exist? I cannot make it work and is not a very good test, why not setup the environement before the test runs?

Anyway searching for clarification, and want to understand why it is ignored anyway.
Should this be removed?

strange issue with amqp in a release

Anyone seen this error while trying to connect to a rabbit node with TLS after an exrm release?

The code runs fine from iex -S mix

17:09:58.596 [info]  Application wameku_client exited: WamekuClient.start(:normal, []) returned an error: shutdown: failed to start child: WamekuClient.Client.Producer
    ** (EXIT) exited in: :gen_server.call(#PID<0.864.0>, :connect, :infinity)
        ** (EXIT) an exception was raised:
            ** (FunctionClauseError) no function clause matching in :amqp_gen_connection.terminate/2
                (amqp_client) src/amqp_gen_connection.erl:230: :amqp_gen_connection.terminate({:bad_return_value, {:error, {:cannot_start_application, :asn1, {'no such file or directory', 'asn1.app'}}}}, {#PID<0.863.0>, {:amqp_params_network, "07c35345ebe6159d0926de64c788a48a", "password", "07c35345ebe6159d0926de64c788a48a", '127.0.0.1', 5671, 0, 0, 60, :infinity, [verify: :verify_none, fail_if_no_peer_cert: false], [&:amqp_auth_mechanisms.plain/3, &:amqp_auth_mechanisms.amqplain/3], [], []}})
                (stdlib) gen_server.erl:629: :gen_server.try_terminate/3
                (stdlib) gen_server.erl:795: :gen_server.terminate/7
                (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
{"Kernel pid terminated",application_controller,"{application_start_failure,wameku_client,{{shutdown,{failed_to_start_child,'Elixir.WamekuClient.Client.Producer',{{function_clause,[{amqp_gen_connection,terminate,[{bad_return_value,{error,{cannot_start_application,asn1,{\"no such file or directory\",\"asn1.app\"}}}},{<0.863.0>,{amqp_params_network,<<\"07c35345ebe6159d0926de64c788a48a\">>,<<\"password\">>,<<\"07c35345ebe6159d0926de64c788a48a\">>,\"127.0.0.1\",5671,0,0,60,infinity,[{verify,verify_none},{fail_if_no_peer_cert,false}],[#Fun<amqp_auth_mechanisms.plain.3>,#Fun<amqp_auth_mechanisms.amqplain.3>],[],[]}}],[{file,\"src/amqp_gen_connection.erl\"},{line,230}]},{gen_server,try_terminate,3,[{file,\"gen_server.erl\"},{line,629}]},{gen_server,terminate,7,[{file,\"gen_server.erl\"},{line,795}]},{proc_lib,init_p_do_apply,3,[{file,\"proc_lib.erl\"},{line,247}]}]},{gen_server,call,[<0.864.0>,connect,infinity]}}}},{'Elixir.WamekuClient',start,[normal,[]]}}}"}
Kernel pid terminated (application_controller) ({application_start_failure,wameku_client,{{shutdown,{failed_to_start_child,'Elixir.WamekuClient.Client.Producer',{{function_clause,[{amqp_gen_connection```

Can't send json message

When I try publish json message, I get FunctionClauseError error. AMQP version is 0.1.4. Poison version is 1.4.0

iex(1)> {:ok, conn} = AMQP.Connection.open
{:ok, %AMQP.Connection{pid: #PID<0.364.0>}}
iex(2)> {:ok, chan} = AMQP.Channel.open(conn)
{:ok,
 %AMQP.Channel{conn: %AMQP.Connection{pid: #PID<0.364.0>}, pid: #PID<0.376.0>}}
iex(3)> AMQP.Queue.declare chan, "test_queue"
{:ok, %{consumer_count: 0, message_count: 0, queue: "test_queue"}}
iex(4)> AMQP.Exchange.declare chan, "test_exchange"
:ok
iex(5)> AMQP.Queue.bind chan, "test_queue", "test_exchange"
:ok
iex(6)> AMQP.Basic.publish(chan, "test_exchange", "", Poison.encode(%{ name: "S" }), [content_type: "application/json"])
:ok
iex(7)>
23:51:45.669 [error] Process #PID<0.377.0> raised an exception
** (FunctionClauseError) no function clause matching in :lists.reverse/1
    (stdlib) lists.erl:146: :lists.reverse({:ok, "{\"name\":\"S\"}"})
    (rabbit_common) src/rabbit_binary_generator.erl:84: :rabbit_binary_generator.build_content_frames/3
    (rabbit_common) src/rabbit_binary_generator.erl:73: :rabbit_binary_generator.build_simple_content_frames/4
    (rabbit_common) src/rabbit_writer.erl:312: :rabbit_writer.assemble_frames/5
    (rabbit_common) src/rabbit_writer.erl:342: :rabbit_writer.internal_send_command_async/3
    (rabbit_common) src/rabbit_writer.erl:210: :rabbit_writer.handle_message/3
    (rabbit_common) src/rabbit_writer.erl:194: :rabbit_writer.mainloop1/2
    (rabbit_common) src/rabbit_writer.erl:185: :rabbit_writer.mainloop/2

Rename to something more descriptive and less official

I realize it's a stretch to hope for a change at this point, but this library has an unfortunate name. "AMQP" is the name of the protocol, not a client library. Further, the name could give a false sense of official-ness, which could be construed as unethical and may or may not cause issues down the road.

This is not a comment on the quality of the library, which I believe to be good. If anything, I want to enhance the usefulness and longevity of this project.

Like I said, I realize it's a stretch at this point.

Usage instructions in README cause error under 0.1.2 release

When following usage example from the README:

iex(6)> AMQP.Basic.publish chan, "test_exchange", "", "Hello, World!"
** (FunctionClauseError) no function clause matching in AMQP.Utils.to_type_tuple/1
    (amqp) lib/amqp/utils.ex:5: AMQP.Utils.to_type_tuple(:undefined)
    (amqp) lib/amqp/basic.ex:61: AMQP.Basic.publish/5

Reverting to 0.1.1, these examples work fine.

issue with 0.2.0 - publisher killed randomly

Hi everybody!

Thanks for the great work you put in this library...it's really appreciated!

I've got a problem in my project by upgrading from 0.2.0-pre.2 to 0.2.0

This is my publisher module:

defmodule Atreyu.Rabbit.Publisher do
  @moduledoc """
  rabbitmq publisher
  """
  use AMQP
  use GenServer

  @connection_params Application.get_env(:atreyu, :rabbit)[:connection_params]

  def start_link do
    Logger.debug "Starting RabbitMQ publisher"
    GenServer.start_link(__MODULE__, get_channel(), name: :publisher)
  end

  defp get_channel do
    with {:ok, conn} <- Connection.open(@connection_params),
         {:ok, chan} <- Channel.open(conn),
      do: chan
  end

  def publish(exchange, routing_key, payload) do
    GenServer.call(:publisher, {:publish, exchange, routing_key, payload})
  end
  
  def handle_call({:publish, exchange, routing_key, payload}, _from, chan) do
    :ok = Basic.publish chan, exchange, routing_key, payload
    {:reply, :ok, chan}
  end
end

very easy and straightforward

Now, in my test suite, if I use 0.2.0-pre.2 everything works. With 0.2.0 I got random errors about the publisher being down and terminated. Some messages get delivered, but at a some point in my test suite some messages starts to kill the GenServer.

this is the error I get:

14:37:00.095 [error] GenServer :publisher terminating
** (stop) exited in: :gen_server.call(#PID<0.928.0>, {:call, {:"basic.publish", 0, "ania", "ania.atr.registra", false, false}, {:amqp_msg, {:P_basic, :undefined, :undefined, :undefined, 1, :undefined, :undefined, :undefined, :undefined, :undefined, :undefined, :undefined, :undefined, :undefined, :undefined}, "{\"code\":\"CASO4\"}"}, #PID<0.930.0>}, :infinity)
    ** (EXIT) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started
    (stdlib) gen_server.erl:212: :gen_server.call/3
    (atreyu) lib/atreyu/rabbit/publisher.ex:49: Atreyu.Rabbit.Publisher.handle_call/3
    (stdlib) gen_server.erl:615: :gen_server.try_handle_call/4
    (stdlib) gen_server.erl:647: :gen_server.handle_msg/5
    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Last message: {:publish, "ania", "ania.atr.registra", "{\"code\":\"CASO4\"}"}

a general error that simply tells that the GenServer is down, so the message cannot be delivered.

Does anybody want to share some ideas? Is this possibly related to this? 0f0c825 (the only relevant change between the two versions, beside amqp_client and rabbit_common)

Many thanks in advance!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.