Giter Club home page Giter Club logo

ex_aws_sqs's Introduction

ExAws.SQS

Module Version Hex Docs Total Download License Last Updated

Service module for https://github.com/ex-aws/ex_aws.

Installation

The package can be installed by adding :ex_aws_sqs to your list of dependencies in mix.exs along with :ex_aws and your preferred JSON codec / HTTP client:

def deps do
  [
    {:ex_aws, "~> 2.1"},
    {:ex_aws_sqs, "~> 3.3"},
    {:poison, "~> 3.0"},
    {:hackney, "~> 1.9"},
    {:saxy, "~> 1.1"}, # or {:sweet_xml, "~> 0.6"}
  ]
end

XML Parser

By default :ex_aws_sqs will use either :sweet_xml or :saxy to parse the XML responses from AWS, depending on which is installed. If both libraries are installed then :saxy will be choosen first. To explicitly choose either parser then set the :parser in your config:

config :ex_aws_sqs, parser: ExAws.SQS.SaxyParser
# OR
config :ex_aws_sqs, parser: ExAws.SQS.SweetXmlParser

But ensure no other dependencies are setting this value.

Copyright and License

The MIT License (MIT)

Copyright (c) 2014 CargoSense, Inc.

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

ex_aws_sqs's People

Contributors

balins avatar benwilson512 avatar bernardd avatar istefo avatar josevalim avatar kianmeng avatar manelli avatar nsdavidson avatar ricn avatar tattdcodemonkey avatar yordis avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar

ex_aws_sqs's Issues

Please publish #4 to hex

I am unable to add ex_aws_sqs to my project that is already on ex_aws 2.1, without downgrading ex_aws or referencing github due to the issue fixed in #4. Could you please push this change to hex?

Unable to receive `Message Group Id` message attribute (or any other attrs) when receiving a message

Environment

  • Elixir & Erlang versions (elixir --version):
Erlang/OTP 22 [erts-10.4.4] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:1] [hipe]

Elixir 1.8.0 (compiled with Erlang/OTP 21)
  • ExAws version mix deps |grep ex_aws
* ex_aws 2.1.1 (Hex package) (mix)
  locked at 2.1.1 (ex_aws) 1e4de210
* ex_aws_sqs 2.0.1 (Hex package) (mix)
  locked at 2.0.1 (ex_aws_sqs) 42b19229
  • HTTP client version. IE for hackney do mix deps | grep hackney
* httpoison 1.5.1 (Hex package) (mix)

Current behavior

I am making a request with ExAws.SQS where queue_name is my queue

# have also tried specifying args as tuples
queue_name
|> ExAws.SQS.receive_message([
  max_number_of_messages: messages_to_receive,
  message_attributes: :all
])
|> ExAws.request()

However, I am not getting back any message_attributes for returned messages.
Example message from response:

       %{ 
         attributes: [],
         body: "some message body",
         md5_of_body: "some_md5_of_body",
         message_attributes: [],
         message_id: "some_message_id",
         receipt_handle: "some_receipt_handle"
       }

Expected behavior

The way I am making my request, I am not getting any message attributes returned even though I am specifying all. My max_number_of_messages option is being read and accepted.

I noticed that MessageGroupId is listed as a potential received message attribute on the AWS API docs https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ReceiveMessage.html

but not here, so I'm thinking this attribute may not be supported / why that would be https://github.com/ex-aws/ex_aws_sqs/blob/master/lib/ex_aws/sqs.ex#L13

  @type sqs_message_attribute_name ::
    :sender_id |
    :sent_timestamp |
    :approximate_receive_count |
    :approximate_first_receive_timestamp |
    :wait_time_seconds |
    :receive_message_wait_time_seconds

It seems strange that I am not getting any message attributes at all.

Maintainer Wanted

Hey @TattdCodeMonkey, per https://elixirforum.com/t/proposal-exaws-2-0/9269 the ExAws project is splitting out each service into its own project.

You are the original contributor the SQS service to ExAws. If you wish to be its maintainer, please let me know and I will give you commit access to this repository. If you do not want this, let me know and I will look for someone else.

Thank you!

Support Queue Tags

Description

Add support for Queue Tags, this requires adding support for the following new actions:

  • ListQueueTags
  • TagQueue
  • UntagQueue

Add optional Tags to createQueue

Saxy.ParseError: unexpected byte "H", expected token: :lt

Environment

  • Elixir & Erlang versions (elixir --version):
ELIXIR_VERSION=1.14.0
OTP_VERSION=24.3.4
ALPINE_VERSION=3.15.3
  • ExAws version mix deps |grep ex_aws
* ex_aws 2.4.0 (Hex package) (mix)
  locked at 2.4.0 (ex_aws) 66dd0bac
* ex_aws_s3 2.3.3 (Hex package) (mix)
  locked at 2.3.3 (ex_aws_s3) 0044f0b6
* ex_aws_sqs 3.3.1 (Hex package) (mix)
  locked at 3.3.1 (ex_aws_sqs) 47d8fc29
  • HTTP client version. IE for hackney do mix deps | grep hackney
* hackney 1.18.1 (Hex package) (rebar3)
  locked at 1.18.1 (hackney) a4ecdaff

Current behavior

Saxy.ParseError: unexpected byte "H", expected token: :lt
File "lib/ex_aws/sqs/saxy_collector.ex", line 93, in ExAws.SQS.SaxyCollector.parse_string!/2
File "lib/ex_aws/sqs/saxy_parser.ex", line 261, in ExAws.SQS.SaxyParser.parse/2
File "lib/my_app/kafka_to_sqs_sink_connector.ex", line 46, in MyApp.KafkaToSqsSinkConnector.handle_message/2
File "/opt/app/deps/brod/src/brod_group_subscriber_worker.erl", line 79, in :brod_group_subscriber_worker.handle_message/3
File "/opt/app/deps/brod/src/brod_topic_subscriber.erl", line 463, in :brod_topic_subscriber.handle_message_set/2
File "/opt/app/deps/brod/src/brod_topic_subscriber.erl", line 299, in :brod_topic_subscriber.handle_info/2
File "gen_server.erl", line 695, in :gen_server.try_dispatch/4
File "gen_server.erl", line 771, in :gen_server.handle_msg/6

Expected behavior

We have the following module

defmodule MyApp.KafkaToSqsSinkConnector do
  @behaviour :brod_group_subscriber_v2

  @prefetch_count 5

  def child_spec(args \\ []) do
    %{
      id: __MODULE__,
      start: {__MODULE__, :start_link, [args]},
      type: :worker,
      restart: :permanent,
      shutdown: 5000
    }
  end

  def start_link(args) do
    args =
      :my_app
      |> Application.get_env(__MODULE__, [])
      |> Keyword.get(:kafka, [])
      |> Keyword.merge(Keyword.get(args, :kafka, []))
      |> Keyword.update(
        :consumer_config,
        [prefetch_count: @prefetch_count],
        &Keyword.put(&1, :prefetch_count, @prefetch_count)
      )
      |> Keyword.update!(:group_id, &MyApp.Kafka.consumer_group_id/1)
      |> Keyword.put(:message_type, :message_set)
      |> Keyword.put(:cb_module, __MODULE__)
      |> Enum.into(%{})

    :brod_group_subscriber_v2.start_link(args)
  end

  @impl :brod_group_subscriber_v2
  def init(_group_id, _init_data) do
    {:ok, []}
  end

  @impl :brod_group_subscriber_v2
  def handle_message(message, state) do
    {:kafka_message_set, _topic_name, _, _, messages} = message

    message_batch = Enum.map(messages, &to_sqs_message_body/1)
    
    case ExAws.request(ExAws.SQS.send_message_batch(queue_url, message_batch), ExAws.Config.new(:sqs)) do
      {:ok, _} -> {:ok, :commit, state}
      {:error, reason} -> raise_error(reason)
    end
  end

  defp to_sqs_message_body({:kafka_message, _offset, _key, data, _action, _timestamp, _headers}) do
    [id: :erlang.phash2(data), message_body: data]
  end

  defp raise_error(error_message) do
    raise "#{__MODULE__} failed due to #{inspect(error_message)}"
  end

  defp queue_url do
    config()
    |> Keyword.fetch!(:sqs)
    |> Keyword.fetch!(:queue_url)
  end

  defp config do
    Application.get_env(:my_app, __MODULE__, [])
  end
end

An SQS sink connector for Kafka.

We have moved some messages just fine so far, but we started getting the error.

Periodic connection errors

Environment

$ elixir --version
Erlang/OTP 22 [erts-10.4.4] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:1] [hipe] [dtrace]

Elixir 1.9.1 (compiled with Erlang/OTP 22)
$ mix deps | grep ex_aws
* ex_aws 2.1.0 (Hex package) (mix)
  locked at 2.1.0 (ex_aws) b9265152
* ex_aws_sqs 3.0.0 (Hex package) (mix)
  locked at 3.0.0 (ex_aws_sqs) a1e3e04a
* ex_aws_s3 2.0.2 (Hex package) (mix)
  locked at 2.0.2 (ex_aws_s3) c0258bbd
$ mix deps | grep hackney
* hackney 1.15.1 (Hex package) (rebar3)
  locked at 1.15.1 (hackney) 9f8f471c

Current behavior

I'm seeing what I think are http connection errors that are happening at regular intervals:

22:22:34.830 [error] SqsTextractPoller #PID<0.3617.0> received unexpected message in handle_info/2: {:ssl_closed, {:sslsocket, {:gen_tcp, #Port<0.40>, :tls_connection, :undefined}, [#PID<0.3704.0>, #PID<0.3703.0>]}}

22:27:35.625 [error] SqsTextractPoller #PID<0.3617.0> received unexpected message in handle_info/2: {:ssl_closed, {:sslsocket, {:gen_tcp, #Port<0.42>, :tls_connection, :undefined}, [#PID<0.3715.0>, #PID<0.3714.0>]}}

22:35:16.663 [error] SqsTextractPoller #PID<0.3617.0> received unexpected message in handle_info/2: {:ssl_closed, {:sslsocket, {:gen_tcp, #Port<0.44>, :tls_connection, :undefined}, [#PID<0.3726.0>, #PID<0.3725.0>]}}

22:39:07.069 [error] SqsTextractPoller #PID<0.3617.0> received unexpected message in handle_info/2: {:ssl_closed, {:sslsocket, {:gen_tcp, #Port<0.45>, :tls_connection, :undefined}, [#PID<0.3732.0>, #PID<0.3731.0>]}}

22:54:29.647 [error] SqsTextractPoller #PID<0.3617.0> received unexpected message in handle_info/2: {:ssl_closed, {:sslsocket, {:gen_tcp, #Port<0.49>, :tls_connection, :undefined}, [#PID<0.3756.0>, #PID<0.3755.0>]}}

The code that it is happening in is a GenServer that just loops indefinitely...

  def handle_cast(:fetch, state) do
    response = ExAws.SQS.receive_message(state.queue_url,
      max_number_of_messages: 10,
      wait_time_seconds: 10
    ) |> ExAws.request!()

    Enum.each response.body.messages, fn message ->
      payload = message.body
      |> Jason.decode!()
      |> Map.get("Message")
      |> Jason.decode!()
      Document.TextractJob.perform_async([payload])

      ExAws.SQS.delete_message(state.queue_url, message.receipt_handle)
      |> ExAws.request!()

      Logger.info("Message deleted: #{message.receipt_handle}")
    end

    GenServer.cast(self(), :fetch)

    {:noreply, state}
  end

Thanks for the help!

All requests are using Queue Name instead of Queue URL

Right now, the ExAws.SQS library builds its request URIs by appending the queue name to the root path:

%ExAws.Operation.Query{
path: "/" <> queue,
params: params |> Map.put("Action", action_string),
service: :sqs,
action: action,
parser: &ExAws.SQS.Parsers.parse/2
}

Not only is this undocumented behavior (which I'm surprised even works, considering it removes the IAM user ID from the Queue URL, though maybe for legacy reasons), but this is something the AWS documentation explicitly says not to do:

In your system, always store the entire queue URL exactly as Amazon SQS returns it to you when you create the queue (for example, https://sqs.us-east-2.amazonaws.com/123456789012/MyQueue). Don't build the queue URL from its separate components each time you need to specify the queue URL in a request because Amazon SQS can change the components that make up the queue URL.

This is an issue for future compatibility with AWS services (as the documentation explicitly states this can change without warning), but it is also already an issue when using alternative AWS implementations such as localstack (which uses ElasticMQ under the hood).

As mentioned in #6, localstack/ElasticMQ uses the /queue/ prefix before the Queue Name to build the the Queue URL (as allowed by the AWS documentation/specification, which is also coherent since they have no notion of IAM users, unlike the official implementation), causing it to only spit out errors when queried by ExAws (other clients work fine).

Crashing when using localstack is quite a big issue, since it prevents local development and testing, not only when using ExAws, but also frameworks like broadway which relies on ExAws.SQS.

Now, I am aware that switching from names to URLs would be a major, breaking change. I can however think of several solutions:

  • If the queue parameter passed to requests passed to is not an URL, then call GetQueueUrl to retrieve the actual URL of the queue.
  • Add an override to SQS request methods that accept a keyword list as a parameter instead of a string, which could be called as ExAws.SQS.receive_message(queue_url: "[...]") for instance. (this could be combined with the workaround above to preserve full compatibility, but opt-in to avoid the extra HTTP request).
  • Make the breaking change and release a new major version. I do not know how acceptable this could be, however.

Of course there may be other solutions I did not see, but in any case if a decision is taken I am willing to submit some PRs to resolve this. (since I'm hacking around to make it work anyway right now ๐Ÿ™‚ )

SQS Message Group Id should be parsed as String

Environment

  • Elixir & Erlang versions (elixir --version):
    erlang 22.3.3
    elixir 1.9.4-otp-22

  • ExAws version mix deps |grep ex_aws

>mix deps |grep ex_aws
* ex_aws (Hex package) (mix)
  locked at 2.1.3 (ex_aws) 0bdbe2ae
* ex_aws_dynamo (Hex package) (mix)
  locked at 3.0.2 (ex_aws_dynamo) 7233b402
* ex_aws_sqs (Hex package) (mix)
  locked at 3.2.1 (ex_aws_sqs) ae77e296
  • HTTP client version. IE for hackney do mix deps | grep hackney
* hackney (Hex package) (rebar3)
  locked at 1.16.0 (hackney) 3bf0bebb

Current behavior

ex_aws_sqs will currently parse an all numerical group id (ie. "111111111") as an integer. The SQS docs do say that a message group id should be a String: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html

We have code that would downcase the group id while processing messages: (here the group id for the message was "00001111111111111111", note that the first three 0's have been cut-off.

GenServer #PID<0.23161.15> terminating
** (FunctionClauseError) no function clause matching in String.downcase/2
    (elixir) lib/string.ex:744: String.downcase(1111111111111111, :default)

Expected behavior

MessageGroupId's should always be parsed as Strings

Error when publishing message to SQS

Environment

  • Elixir & Erlang versions (elixir --version): Elixir 1.10.2 (compiled with Erlang/OTP 22)

  • ExAws version mix deps |grep ex_aws:

  • ex_aws 2.1.2 (Hex package) (mix)
    locked at 2.1.2 (ex_aws) b1d24a9a

  • ex_aws_sqs 3.1.0 (Hex package) (mix)
    locked at 3.1.0 (ex_aws_sqs) ddb26bce

  • HTTP client version. IE for hackney do mix deps | grep hackney: hackney 1.15.2 (Hex package) (rebar3)
    {:ex_aws_sqs, "> 3.1"},
    {:jason, "
    > 1.1"},
    {:poison, "> 3.0"},
    {:hackney, "
    > 1.9"},
    {:sweet_xml, "~> 0.6"},

Current behavior

Sometimes when trying to publish a message, I receive an error
This is the code I'm using to publish:

body = %{message: message_params, user: %{id: token[:sub]}, msg_id: msg_id}
payload = Jason.encode!(body)
envs[:queue_outbound_name]
|> SQS.send_message(payload)
|> ExAws.request()

Most of the time all messages work, but sometimes I receive:

Elixir.RuntimeError: ** (exit) {:fatal, {:expected_element_start_tag, {:file, :file_name_unknown}, {:line, 1}, {:col, 1}}} (xmerl) xmerl_scan.erl:4124: :xmerl_scan.fatal/2 (xmerl) xmerl_scan.erl:572: :xmerl_scan.scan_document/2 (xmerl) xmerl_scan.erl:291: :xmerl_scan.string/2 (sweet_xml) lib/sweet_xml.ex:237: SweetXml.parse/2 (sweet_xml) lib/sweet_xml.ex:421: SweetXml.xpath/2 (sweet_xml) lib/sweet_xml.ex:454: SweetXml.xpath/3 (ex_aws_sqs) lib/ex_aws/sqs/sweet_xml_parser.ex:11: ExAws.SQS.SweetXmlParser.parse/2 (xxx) web/controllers/message_controller.ex:21: Hermesapi.MessageController.create/2

Expected behavior

Sometimes the same content works and sometimes don't.

I didn't try to change the dep to Saxy, should I?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.