Giter Club home page Giter Club logo

amqpstorm's People

Contributors

aiden0z avatar bhoehl avatar bitdeli-chef avatar carlhoerberg avatar cp2587 avatar eandersson avatar jhogg avatar mikemrm avatar s-at-ik avatar smarlowucf avatar tkram01 avatar visobet avatar zyguspatryk avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

amqpstorm's Issues

channel.build_inbound_messages with break_on_empty=True randomly breaks loop

Hi,

  • Context: I am trying to empty a queue from time to time and exit once empty. (e.g: queue has 10k messages)
  • Version: 2.4.2 / python3
  • Issue: When break_on_empty=True in channel.build_inbound_messages is set, it randomly gets between 1 to 15 messages before exiting whereas there are still thousands of messages in the queue. The queue contains 10k messages, no publishers, the queue is still. Each run of the below code would get a random amount of messages and then exit. Without break_on_empty, it gets the 10k messages in less than 10sec but then it remains connected which I don't want.

An edited version of the code I used.

from amqpstorm import Connection
queue='simple_queue'
server='127.0.0.1'
username = password = 'guest'

count=0
with Connection(server, username, password) as connection:
  with connection.channel() as channel:
    #channel.queue.declare(queue)
    channel.basic.qos(100)
    channel.basic.consume(queue=queue, no_ack=False)
    for message in channel.build_inbound_messages(break_on_empty=True):
      if message:
         count += 1
         message.ack()

ref: https://www.amqpstorm.io/api/channel.html#amqpstorm.Channel.build_inbound_messages

Thanks for this library.

Timeout Error if Connection Initiated While Server is Starting Up

I'm seeing this in particular with Docker... I run $ docker run -p 5672:5672 rabbitmq, and then before RabbitMQ has had a chance to start up, I run $ python3 -c "import amqpstorm; amqpstorm.Connection('localhost', 'guest', 'guest')". I get the following error thirty seconds later (well after RabbitMQ has finished starting): amqpstorm.exception.AMQPConnectionError: Connection timed out

Replicated in a single command:

$ > docker run -p 5672:5672 -d rabbitmq; sleep 1s; python -c "import amqpstorm; amqpstorm.Connection('localhost', 'guest', 'guest')"
31a4da1f7aa3015550be080af4bc4bb76f0540b323ca0935bbe88f4d1082b1b0
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/usr/lib/python3.5/site-packages/amqpstorm/connection.py", line 65, in __init__
    self.open()
  File "/usr/lib/python3.5/site-packages/amqpstorm/connection.py", line 123, in open
    self._wait_for_connection_to_open()
  File "/usr/lib/python3.5/site-packages/amqpstorm/connection.py", line 227, in _wait_for_connection_to_open
    raise AMQPConnectionError('Connection timed out')
amqpstorm.exception.AMQPConnectionError: Connection timed out
$ >

If I sleep for, say, five seconds after starting the Docker container, then the connection gets made just fine, and immediately.

Fanout Exchange

Hello,

I'am trying to set up a fanout exchange to deliver a message to multiple consumers. I didn't find any example for that. I tried to write something similar to the pika example here https://www.rabbitmq.com/tutorials/tutorial-three-python.html. The code doesn't raise any error, but only the first consumer receives message. Here the relevant code:

Publisher:

self.connection = amqpstorm.Connection(self.host, self.username, self.password)
self.channel = self.connection.channel()
self.channel.exchange.declare(exchange=self.exchange, exchange_type='fanout')
self.channel.queue.declare(self.queue, durable=True)
self.channel.queue.bind(self.queue, exchange=self.exchange)
self.channel.confirm_deliveries()
self.channel.basic.publish(body=body, routing_key=self.queue, mandatory=True, exchange=self.exchange)

Consumer:

self.connection = amqpstorm.Connection(self.host, self.username, self.password)
self.channel = self.connection.channel()
self.channel.exchange.declare(exchange=self.exchange, exchange_type='fanout')
self.channel.queue.declare(self.queue, durable=True)
self.channel.queue.bind(self.queue, exchange=self.exchange)
self.channel.confirm_deliveries()
self.channel.basic.consume(self.queue, no_ack=False)

Is the correct code to set up a fanout exchange ? I tried with no_ack=False and no_ack=True, and different variants of the code above, but I always get the same thing: no error, but only the first consumer receives the message.

Thank you in advance for your help.

Not enough logging when SSL connection cannot be established

Hi,
in io. _find_address_and_connect the except: (IOError and OSError) are handled silently and the actual error message is missed.
when neither of address is reachable the AMQPConnectionError("Could not connect host : port") is thrown without the original cause.

The issue with it is: When for example certificate validation error appears, it is not printed anywhere (it enters the except clause above)

Could You consider either throwing the original message in AMQPConnectionError or at least print it to the logger?

Best regards,
Kamil

Threading

Hi,

First, let me thank you for your library. It works really well. I am using it inside celery tasks to send log message in an AMQP queue that gets further process by logstash (so i need thread safety). I was just wondering why does the library spawns a new thread to handle the amqp connection, can't it be done in the main process ? I would greatly appreciate if you explain to me the workflow of the library in a threading point of view, I don't really understand how rabbitmq and threading should be linked..

Cheers,

healthchecks currently not supported in management/api.py

I'm currently working on a RabbitMQ plugins written in python to improve Openstack backend monitoring and I found that management API and saw that api/aliveness-test is here already

Expected returned HTTP body on a stopped RabbitMQ server

$ curl --silent -k -u monitor:$MonitorPass https://localhost:15671/api/healthchecks/node/rabbit@eulab-os-01-rmq02 | python -m json.tool

{
    "reason": "nodedown",
    "status": "failed"
}

Expected returned HTTP body on a running RabbitMQ server

$ curl --silent -k -u monitor:$MonitorPass https://localhost:15671/api/healthchecks/node/rabbit@eulab-os-01-rmq01 | python -m json.tool

{
    "status": "ok"
}

Official API doc : https://cdn.rawgit.com/rabbitmq/rabbitmq-management/rabbitmq_v3_6_10/priv/www/api/index.html

amqpstorm.exception.AMQPConnectionError: [Errno 9] on Sun Solaris 10

Hi, i had this exception:
"Traceback (most recent call last):
File "rpc_server.py", line 10, in
CONNECTION = amqpstorm.Connection('localhost', 'user', 'pass',port=5672)
File "/tmp/test/lib/python3.3/site-packages/amqpstorm/connection.py", line 70, in init
self.open()
File "/tmp/test/lib/python3.3/site-packages/amqpstorm/connection.py", line 190, in open
self._io.open()
File "/tmp/test/lib/python3.3/site-packages/amqpstorm/io.py", line 99, in open
sock_addresses = self._get_socket_addresses()
File "/tmp/test/lib/python3.3/site-packages/amqpstorm/io.py", line 150, in _get_socket_addresses
raise AMQPConnectionError(why)
amqpstorm.exception.AMQPConnectionError: [Errno 9] service name not available for the specified socket type"

trying to run your rpc_server.py exemple on a Sun Solaris 10 VM.

I've roughly solved the problem adding this last parameter to line 145 of "io.py":
"addresses = socket.getaddrinfo(self._parameters['hostname'],self._parameters['port'],family,socket.SOCK_STREAM)"

I hope it can be useful.

LICENSE file is not distributed in pypi archive

Could you please distribute the MIT license there so distributors can provide it when installing the version.

Another issue is that 2.4.1 tag is missing from the git repo so I can't just quickly swap the tarball from pypi to github one.

Channels are left idle on Error

We use a version of scalable_consumer where we attempt to re-open the channel on AmqpChannelError rather than destroying the entire connection.

However, this leads to a channel leak. The channels are never removed from the channel list on the server.
I have noticed that on a channel error that CloseOk is never sent... the spec (to me) is unclear as to whether it is necessary but maybe it is as the channels are never fully closed on the server. (v3.5.8)

I attempt to bind to an non-existent queue to cause the error in testing.

Best practices on connection timeouts?

Hello again. I'm using AMQPStorm==2.4.0 on production and it a great little lib - thanks again! However, I'm occasionally (twice a month?) seeing a connection timeout:

amqpstorm.exception.AMQPConnectionError: Connection dead, no heartbeat or data received in >= 20s

what's the best practice here? should I catch the exception and re-establish the connection on the app level with backoff? is there some mechanism in amqpstorm that can do this?
thx!

Properties and Headers

The value of some properties (i.e: message_id) are being returned as byte array, same thing for headers keys, they could be returned as string for easy use.

Thanks!

Queue declaration must not be obligatory, AMQPChannelError on trying to consume from predeclared queue

I use RabbitMQ, all the excahnges and consumers were preconfigured with differnet options. Unfortunately when I try to consume from a durable queue I get

mqpstorm.exception.AMQPChannelError: Channel 1 was closed by remote server: NOT_FOUND - no previously declared queue

If I try to declare a queue without specifying any params I get

Channel 1 was closed by remote server: PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'test_queue' in vhost 'lizziebot': received 'false' but current is 'true'

The only way is to specify all the params including including ttl, autodelete, etc. I cannot use this library because I need rabbitmq structure to be configured not on the client side ;-(

heartbeat error

dear all,

I have a very simple publish example and running into this error Could you advise?

self.channel.basic.publish(body=content, exchange=destination, routing_key=destination)

File "/usr/local/lib/python3.6/site-packages/amqpstorm/basic.py", line 199, in publish
self._channel.write_frames(frames_out)
File "/usr/local/lib/python3.6/site-packages/amqpstorm/channel.py", line 353, in write_frames
self.check_for_errors()
File "/usr/local/lib/python3.6/site-packages/amqpstorm/channel.py", line 183, in check_for_errors
self._connection.check_for_errors()
File "/usr/local/lib/python3.6/site-packages/amqpstorm/connection.py", line 186, in check_for_errors
raise self.exceptions[0]
File "/rtmtb/rtmtb/strategy/strategy_consumer.py", line 382, in work
self.send_to_output(signal=signal, dest_idx=0)
File "/rtmtb/rtmtb/consumer.py", line 67, in send_to_output
self.message.publish(self.destination[dest_idx], signal._asdict())
File "/rtmtb/rtmtb/advisors/message/rabbit_storm.py", line 59, in publish
self.channel.basic.publish(body=content, exchange=destination, routing_key=destination)
File "/usr/local/lib/python3.6/site-packages/amqpstorm/basic.py", line 199, in publish
self._channel.write_frames(frames_out)
File "/usr/local/lib/python3.6/site-packages/amqpstorm/channel.py", line 353, in write_frames
self.check_for_errors()
File "/usr/local/lib/python3.6/site-packages/amqpstorm/channel.py", line 183, in check_for_errors
self._connection.check_for_errors()
File "/usr/local/lib/python3.6/site-packages/amqpstorm/connection.py", line 186, in check_for_errors
raise self.exceptions[0]
File "/rtmtb/rtmtb/strategy/strategy_consumer.py", line 382, in work
self.send_to_output(signal=signal, dest_idx=0)
File "/rtmtb/rtmtb/consumer.py", line 67, in send_to_output
self.message.publish(self.destination[dest_idx], signal._asdict())
File "/rtmtb/rtmtb/advisors/message/rabbit_storm.py", line 59, in publish
self.channel.basic.publish(body=content, exchange=destination, routing_key=destination)
File "/usr/local/lib/python3.6/site-packages/amqpstorm/basic.py", line 199, in publish
self._channel.write_frames(frames_out)
File "/usr/local/lib/python3.6/site-packages/amqpstorm/channel.py", line 353, in write_frames
self.check_for_errors()
File "/usr/local/lib/python3.6/site-packages/amqpstorm/channel.py", line 183, in check_for_errors
self._connection.check_for_errors()
File "/usr/local/lib/python3.6/site-packages/amqpstorm/connection.py", line 186, in check_for_errors
raise self.exceptions[0]
File "/rtmtb/rtmtb/strategy/strategy_consumer.py", line 382, in work
self.send_to_output(signal=signal, dest_idx=0)
File "/rtmtb/rtmtb/consumer.py", line 67, in send_to_output
self.message.publish(self.destination[dest_idx], signal._asdict())
File "/rtmtb/rtmtb/advisors/message/rabbit_storm.py", line 59, in publish
self.channel.basic.publish(body=content, exchange=destination, routing_key=destination)
File "/usr/local/lib/python3.6/site-packages/amqpstorm/basic.py", line 199, in publish
self._channel.write_frames(frames_out)
File "/usr/local/lib/python3.6/site-packages/amqpstorm/channel.py", line 353, in write_frames
self.check_for_errors()
File "/usr/local/lib/python3.6/site-packages/amqpstorm/channel.py", line 183, in check_for_errors
self._connection.check_for_errors()
File "/usr/local/lib/python3.6/site-packages/amqpstorm/connection.py", line 186, in check_for_errors
raise self.exceptions[0]
File "/rtmtb/rtmtb/strategy/strategy_consumer.py", line 382, in work
self.send_to_output(signal=signal, dest_idx=0)
File "/rtmtb/rtmtb/consumer.py", line 67, in send_to_output
self.message.publish(self.destination[dest_idx], signal._asdict())
File "/rtmtb/rtmtb/advisors/message/rabbit_storm.py", line 59, in publish
self.channel.basic.publish(body=content, exchange=destination, routing_key=destination)
File "/usr/local/lib/python3.6/site-packages/amqpstorm/basic.py", line 199, in publish
self._channel.write_frames(frames_out)
File "/usr/local/lib/python3.6/site-packages/amqpstorm/channel.py", line 353, in write_frames
self.check_for_errors()
File "/usr/local/lib/python3.6/site-packages/amqpstorm/channel.py", line 183, in check_for_errors
self._connection.check_for_errors()
File "/usr/local/lib/python3.6/site-packages/amqpstorm/connection.py", line 186, in check_for_errors
raise self.exceptions[0]
amqpstorm.exception.AMQPConnectionError: Connection dead, no heartbeat or data received in >= 120s

high memory usage with no_ack=True

Hi

I'm using amqpstorm 2.2.0 to consume messages from RabbitMQ.
I need a high rate of messages. With no_ack=false I increased the QOS value higher and higher to get better message rates. With ack I can reach up to 1.5k msg/s

Then I tried with no_ack=True and got the rates (4k/s) I needed. But the QUS value is ignored and it seems that amqpstorm reads all messages from RabbitMQ to its local queue.
The memory usage of my script raises until the system crasches ( this only happens if the rabbit queue is very big, after pausing the consumer for a few days).
Is there a way to limit the amount of used memory in amqpstorm basic.consume?

Chris

Getting Nack or Ack

Hi,

It seems like the publish returns true even if I explicitly return a nack for a message. for example I have queue with the following settings in a RabbitMQ server:
args = {
'x-max-length': 10, # maximum length of queue, will receive nack afterwards
'x-overflow': 'reject-publish', # Reject the new publish with a nack
# 'x-message-ttl': 5000,
# 'x-dead-letter-exchange': 'amq.direct',
# 'x-dead-letter-routing-key': self.rpc_queue + '/dlx'
}

But even if the max-length is reached, the publish function returns true. I need this info to reject the process in my flask app.

I also tried to add a consumer and explicitly nack the message with reque=False, still the sender has no idea. BTW, I started from the scalable rpc example and when the maximum queue length has reached, I want to reject the request.
BTW, I also have called the channel.confirm_deliveries() method.

Is there anything (like settings) that I have missed?

Jython 2.7.0 crashing with error

DEBUG:amqpstorm.connection:Connection Opening
Traceback (most recent call last):
  File "C:/Users/eandersson/Documents/GitHub/amqp-storm/examples/simple_publisher.py", line 29, in <module>
    publisher()
  File "C:/Users/eandersson/Documents/GitHub/amqp-storm/examples/simple_publisher.py", line 0, in publisher
  File "C:\Users\eandersson\Documents\GitHub\amqp-storm\amqpstorm\connection.py", line 69, in __init__
    self.open()
  File "C:\Users\eandersson\Documents\GitHub\amqp-storm\amqpstorm\connection.py", line 174, in open
    self._io.open()
  File "C:\Users\eandersson\Documents\GitHub\amqp-storm\amqpstorm\io.py", line 98, in open
    self.socket = self._find_address_and_connect(sock_addresses)
  File "C:\Users\eandersson\Documents\GitHub\amqp-storm\amqpstorm\io.py", line 160, in _find_address_and_connect
    sock = self._create_socket(socket_family=address[0])
  File "C:\Users\eandersson\Documents\GitHub\amqp-storm\amqpstorm\io.py", line 177, in _create_socket
    sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
AttributeError: 'module' object has no attribute 'SOL_TCP'

Connection timeout to cloudamqp.com

Hi there,
thanks for a great package and implementing heartbeat timer.

I was developing something locally (pointing to local rabbitmq) and it was working perfectly, but after deploying my app to the cloud and pointing to cloudamqp.com, your package cannot connect anymore...

I've tried to compare your package with other clients:

Python 3.6.3 (default, Oct  3 2017, 21:45:48)
[GCC 7.2.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import os
>>> RABBIT_DSN = os.getenv('RABBIT_DSN')
>>> import pika
>>> connection = pika.BlockingConnection(pika.URLParameters(RABBIT_DSN))
>>> channel = connection.channel() # worked
>>> import rabbitpy
>>> connection = rabbitpy.Connection(RABBIT_DSN)
>>> channel = connection.channel() # worked
>>> import amqpstorm
>>> connection = amqpstorm.UriConnection(RABBIT_DSN) # timeout :(
command terminated with exit code 137

the last command terminates the terminal.

Traceback (most recent call last):
  File "importer.py", line 240, in <module>
    connection = UriConnection(RABBIT_DSN)
  File "/app/.local/share/virtualenvs/src-sjxfABcx/lib/python3.6/site-packages/amqpstorm/uri_connection.py", line 46, in __init__
    **kwargs)
  File "/app/.local/share/virtualenvs/src-sjxfABcx/lib/python3.6/site-packages/amqpstorm/connection.py", line 70, in __init__
    self.open()
  File "/app/.local/share/virtualenvs/src-sjxfABcx/lib/python3.6/site-packages/amqpstorm/connection.py", line 193, in open
    self._wait_for_connection_state(state=Stateful.OPEN)
  File "/app/.local/share/virtualenvs/src-sjxfABcx/lib/python3.6/site-packages/amqpstorm/connection.py", line 315, in _wait_for_connection_state
    raise AMQPConnectionError('Connection timed out')
amqpstorm.exception.AMQPConnectionError: Connection timed out

For you information, our deployment is made to kubernetes on GCP. The connection URL string has a following format:
amqp://user:[email protected]/user

I would appreciate any help :)

Connection close and dispose takes 10 seconds

Good morning,
I see that closing connection takes 10 seconds executing this code:

import ssl
import os
import sys
import logging
import time

LOG = logging.getLogger('SM.AmqpRpcClient')


class AmqpRpcClient(object):
    def __init__(self,props):
        self.__props = props

    def exec_rpc(self,exchange = "", routing_key="",message_body=None,timeout=15):
        hostname = self.__props["host"]
        ca_path = self.__props["ca_path"]
        
        ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)  # pylint: disable=no-member
        ssl_context.load_verify_locations(cafile=ca_path)

        import amqpstorm  # pylint: disable=import-error
        with amqpstorm.Connection(
                hostname = hostname,
                port = int(self.__props["port"]),
                username = self.__props["user"],
                password = self.__props["password"],
                virtual_host=self.__props["vhost"],
                ssl=True,
                ssl_options={
                    "server_hostname":hostname,
                    "suppress_ragged_eofs":False,
                    "context":ssl_context},
                heartbeat=60) as connection:
            with connection.channel() as channel:
                result = channel.queue.declare(exclusive=True,auto_delete=True,durable=False)
                response_queue = result['queue']
                message = amqpstorm.Message.create(channel, body=message_body)
                message.reply_to = response_queue
                correlation_id = message.correlation_id
                LOG.info("RPC correlation id: " + correlation_id)
                message.publish(exchange=exchange,routing_key=routing_key)

                i = 0
                while i< timeout:
                    message = channel.basic.get(response_queue)
                    if not message:
                        LOG.debug("waiting for response...")
                        i+=1
                        time.sleep(1)
                    else:
                        if message.correlation_id == correlation_id:
                            message.ack()
                            return message.body
                        else:
                            LOG.info("not my message " + str(message.correlation_id))
                LOG.info("Closing channel...")
            LOG.info("Closing connection...")
        raise TimeoutError("RPCCall: No response returned")

In this case the there is no response in a queue so the TimeoutError error is raised 10 seconds after closing connection.

I am using SSL TLS 1.2 connection.

best regards,
Kamil Gรณrka

New project name?

Hey,

So while AMQP-Storm has existed, as well... AMQP-Storm for two years now, and I have been thinking about re-branding the project for a long time. This is primarily because the current name, is easily confused with StormMQ. Instead, I would like something more unique.

The implications of changing the name of a project are huge, but as the library is extremely stable at the moment, and is unlikely to be changed a lot over the next year or so. I think it would give people sufficient time to migrate to the new name.

Anyway, I was hoping to get some feedback on the idea, and hopefully some neat name suggestions!

Acknowledge multiple mesages

Is acknowledging multiple messages supported? Calling channel.basic.ack(multiple=True) gives me

File "/Users/kramer/Documents/amqpstorm/amqpstorm/basic.py", line 217, in ack
    self._channel.write_frame(ack_frame)
  File "/Users/kramer/Documents/amqpstorm/amqpstorm/channel.py", line 328, in write_frame
    self._connection.write_frame(self.channel_id, frame_out)
  File "/Users/kramer/Documents/amqpstorm/amqpstorm/connection.py", line 206, in write_frame
    frame_data = pamqp_frame.marshal(frame_out, channel_id)
  File "/anaconda/envs/turnip/lib/python3.6/site-packages/pamqp/frame.py", line 103, in marshal
    return _marshal_method_frame(frame_value, channel_id)
  File "/anaconda/envs/turnip/lib/python3.6/site-packages/pamqp/frame.py", line 255, in _marshal_method_frame
    frame_value.marshal())
  File "/anaconda/envs/turnip/lib/python3.6/site-packages/pamqp/specification.py", line 191, in marshal
    output.append(encode.by_type(data_value, data_type))
  File "/anaconda/envs/turnip/lib/python3.6/site-packages/pamqp/encode.py", line 358, in by_type
    return long_long_int(value)
  File "/anaconda/envs/turnip/lib/python3.6/site-packages/pamqp/encode.py", line 127, in long_long_int
    raise TypeError("int type required")
TypeError: int type required

Prefetch seems to be off by 1 when no message body

I know this sounds crazy :)

There seems to be an issue with prefetch/ack/messagebody

If you set QOS to 1
and publish messages with headers but no body (via management plugin)
1 message is fetched but never hits the handler and hence never acked.
Using simple_consumer.py against 3.5.7 and 3.6.2

The use case is I'm trying to consume rabbit event messages which have no body
QOS 1 is to clearly illustrate the issue, any QOS seems to result in one unprocessed message

Insecure SSL currently not supported for ManagementAPI

$ python test-aliveness.py

Connection Error: HTTPSConnectionPool(host='127.0.0.1', port=15671): Max retries exceeded with url: /api/aliveness-test/%2F (Caused by SSLError(SSLError(1, u'[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed (_ssl.c:579)'),))

From the example

from amqpstorm.management import ApiConnectionError
from amqpstorm.management import ApiError
from amqpstorm.management import ManagementApi

if __name__ == '__main__':
    API = ManagementApi('https://127.0.0.1:15671', 'Username', 'Password')
    try:
        result = API.aliveness_test('/')
        if result['status'] == 'ok':
            print("RabbitMQ is alive!")
        else:
            print("RabbitMQ is not alive! :(")
    except ApiConnectionError as why:
        print('Connection Error: %s' % why)
    except ApiError as why:
        print('ApiError: %s' % why)

Possible solution would be to allow "verify=False" for requests

ttl example

thank you for your great work. Would you be able to provide an example how to send a message to a queue with a defined TTL?

I would like to set the ttl on the message it self, since different messages could have different TTL's

kind regards

UriConnection SSL Warning with Heartbeat option

Very minor issue.
if you use UriConnection and add a heartbeat querystring option i.e. ?heartbeat=30 a warning is output "invalid ssl option: heartbeat"
The heartbeat is still set and nothing seems affected.

Connection was closed by remote server: CONNECTION_FORCED

Hello,

We recently updated the library version to 2.1.3 (from 1.1.7) and we now face several errors we did not have previously. One of them is the following:

    self.channel.basic.publish(body=data, exchange=self.exchange, routing_key='')
  File "/home/wirebot/.virtualenvs/cayzn_yield.etl/local/lib/python2.7/site-packages/amqpstorm/basic.py", line 194, in publish
    self._channel.write_frames(frames_out)
  File "/home/wirebot/.virtualenvs/cayzn_yield.etl/local/lib/python2.7/site-packages/amqpstorm/channel.py", line 326, in write_frames
    self.check_for_errors()
  File "/home/wirebot/.virtualenvs/cayzn_yield.etl/local/lib/python2.7/site-packages/amqpstorm/channel.py", line 169, in check_for_errors
    self._connection.check_for_errors()
  File "/home/wirebot/.virtualenvs/cayzn_yield.etl/local/lib/python2.7/site-packages/amqpstorm/connection.py", line 155, in check_for_errors
    raise self.exceptions[0]
AMQPConnectionError: Connection was closed by remote server: CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'

On the rabbitmq server, we have the following logs:
client unexpectedly closed TCP connection

Finally here is the code i use to create an amqp socket (this socket is then used by logging to send log in the queue) :

class AMQPStormSocket(object):

    def __init__(self, host, port, username, password, virtual_host, exchange, queue, exchange_is_durable,
                 queue_is_durable, exchange_type, fallback_call):

        # create connection & channel
        self.connection = amqpstorm.Connection(host, username, password, port, virtual_host=virtual_host, timeout=1)
        self.channel = self.connection.channel()

        # create an exchange, if needed
        self.channel.exchange.declare(exchange=exchange, exchange_type=exchange_type, durable=exchange_is_durable)
        # create a queue, if needed
        self.channel.queue.declare(queue=queue, durable=queue_is_durable, passive=False, auto_delete=False)
        # bind it
        self.channel.queue.bind(queue=queue, exchange=exchange)

        # needed when publishing
        self.exchange = exchange

        self.fallback_call = fallback_call

    def sendall(self, data):
        try:
            self.channel.basic.publish(body=data, exchange=self.exchange, routing_key='')
        except Exception as e:
            self.fallback_call(e)

    def close(self):
        try:
            self.channel.close()
            self.connection.close()
        except Exception:
            pass

Do you have an idea on how to fix these errors ?

general question regarding memory consumption

hi,

we run into another problem, basically for some reason the amqp stack is consuming ungodly amounts of memory and we feel we are doing something wrong.

example using tracemalloc:

Top 30 lines
#1: pamqp/frame.py:62: 951.8 MiB
frame_data = data_in[FRAME_HEADER_SIZE:byte_count - 1]
#2: pamqp/decode.py:417: 361.4 MiB
return value.decode('utf-8')
#3: pamqp/decode.py:296: 165.6 MiB
data = {}
#4: pamqp/decode.py:258: 154.9 MiB
return 8, time.gmtime(value[0])
#5: pamqp/header.py:84: 109.0 MiB
self.properties = properties or specification.Basic.Properties()
#6: pamqp/body.py:21: 78.8 MiB
self.value = value
#7: pamqp/header.py:81: 78.8 MiB
self.class_id = None
#8: pamqp/frame.py:135: 57.4 MiB
method = specification.INDEX_MAPPINGmethod_index
#9: pamqp/frame.py:157: 40.2 MiB
content_header = header.ContentHeader()
#10: pamqp/frame.py:172: 40.2 MiB
content_body = body.ContentBody()
#11: pamqp/header.py:104: 20.1 MiB
self.body_size) = struct.unpack('>HHQ', data[0:12])
#12: pamqp/decode.py:157: 20.1 MiB
return 8, struct.unpack('>q', value[0:8])[0]
#13: amqpstorm/channel.py:229: 18.9 MiB
self._inbound.append(frame_in)
#14: :525: 9.8 MiB
#15: json/decoder.py:353: 4.6 MiB
obj, end = self.scan_once(s, idx)

This is after running for about 15-20 minutes and receiving maybe a million messages. Any idea why there is such a buildup of memory? It quickly exceeds a couple GB and we feel there is some odd issue happening.

kind regards

reached the maximum number of channels raised with closed channels

Hello,

I've run into an issue with creating new channels and receiving the reached the maximum number of channels 65535 when attempting to create a new channel.

After some digging, I noticed Connection._get_next_available_channel_id() accounts for all channels, both open and closed. I believe filtering the count for just opened should resolve this issue.

I tested with a quick fix

    def _get_next_available_channel_id(self):
        channel_id = len(self._channels) + 1
        active_channels = [
            ch for ch in list(self._channels.values()) if ch and ch.is_open
        ]
        if len(active_channels) >= self.max_allowed_channels:
            raise AMQPConnectionError(
                'reached the maximum number of channels %d' %
                self.max_allowed_channels)
        return channel_id

However it may be better to just keep an active count

amqpstorm and gevent?

Hello there. First please allow me to extend my gratitude for amqp-storm: I love it and have been using it for a few months.

here's my issue:
I wrote a small connection-pool implementation on top of amqp-storm and was hoping to use that in my gevent-patched Flask webservice. However, the handshake between my client and rabbit fails with timeout.

  1. Is amqp-storm expected to work in gevent-patched environment?
  2. when I removed gevent everything worked just fine.

Below is the output from docker-compose (the server and rabbit are each docker'd) - its interleaved.
As you can see the connection is opened but it appears that the handshake fails with timeout (nothing comes through?)

server-dev_1 | Connection Opening
rabbit_1 |
rabbit_1 | =INFO REPORT==== 23-Jun-2016::14:26:48 ===
rabbit_1 | accepting AMQP connection <0.19907.0> (172.18.0.5:52906 -> 172.18.0.2:5672)
rabbit_1 |
rabbit_1 | =ERROR REPORT==== 23-Jun-2016::14:26:58 ===
rabbit_1 | closing AMQP connection <0.19907.0> (172.18.0.5:52906 -> 172.18.0.2:5672):
rabbit_1 | {handshake_timeout,frame_header}
server-dev_1 | Frame Received: Connection.Start
server-dev_1 | [Errno 104] Connection reset by peer
server-dev_1 | Connection Closing

Many thanks!

SSL client does not validate rabbitmq server certificate

hello,
libary is using deprecated way of sll connection (ssl.wrap_socket) which does not handle hostname and server certificate validation.

This enables MITM attack so this is not good.

in io.py in _ssl_wrap_socket instead of

ssl.wrap_socket (...

it should be:

def _ssl_wrap_socket(self, sock):
        """Wrap SSLSocket around the Socket.

        :param socket.socket sock:
        :rtype: SSLSocket
        """
      
        import ssl
        context = ssl.create_default_context()
        return context.wrap_socket(
            sock, do_handshake_on_connect=True,
			server_hostname='hostname.bla.bla',
            **self._parameters['ssl_options']
        )

This is dirty solution but I just wanted to show where is the problem.

Please see: https://docs.python.org/3/library/ssl.html

Best regards,
Kamil Gรณrka

Reconnecting threaded consumers

Is there any guidance for how to reconnect on lost connection when you have multiple consumers?
At the moment I have a channel per thread.
On the main thread I reconnect on error

if self.connection.is_closed: self.connection.open()

I restart each thread which opens a new channel and consumes.
I notice that after a reconnect it adds another heartbeat checker - each period i see multiple
"Checking for a heartbeat"

Thanks

Example of Robust multi-channel consumer and publisher

Whats the best way to reconnect when a channel errors?
Is it OK to call connection.open() to re-open a closed/errored connection? same for channel.open ?
Is it thread safe for multiple threads to try and re-open the connection on error?
Or do we need to pass around a connection wrapper that locks?

no heartbeat messages?

hello and thank you again for this fine library (which I'm once again using).
It appears that no heartbeat messages are being sent from amqpstorm. I've configured it to 10 seconds and the rabbit server clearly states that no heartbeat was received in 10 secs.

-I'm working with version 2.4.0
-I'm working with python 3.5.1-3 on ubuntu16.04 LTS
-I've yet to really dig in, but it looks like the timer that should send the message is primed but does not 'pop'.

is this a known issue? I've also tried downgrading to 2.1.2 but to no avail.
at the moment I've disabled heartbeat by setting it to '0'.
thanks!

Testing Consumer Example

More of a request than an actual issue.
I have been using the library successfully and thank you for that, however I reached a point of doing the unit tests for my app, and I noticed there are some utility classes such as FakeChannel, FakeConnection, TestFramework and I have been trying to figure out how or if I could use them.

I am looking to test something like the robust consumer from the examples: https://github.com/eandersson/amqpstorm/blob/master/examples/robust_consumer.py - there might be a simpler way to test this and I am missing it.

Some pointers would be great. ๐Ÿ‘

localhost on OSX

Hi,

In this last version the url localhost does not work anymore on OS X (Ubuntu works fine), I got Connection Refused.
url: amqp://guest:guest@localhost/

Below the line which raise the exception:

# file io.py
sock.connect(sock_address_tuple[4])
sock_address_tuple[4] = ('::1', 5672, 0, 0)

If I use 127.0.0.1 rather than localhost it works well.

Cheers!

Django+Celery, Celery MainProcess: Exception in thread amqpstorm.io self._channels[channel_id].on_frame(frame_in) KeyError 2

Part of Producer code service.py :

class Producer(object):
    def __init__(self, **kwargs):
        self.max_retries = kwargs.get('max_retries', 3)
        self.connection = None
        self.host = kwargs.get('host', '127.0.0.1')
        self.port = kwargs.get('port', 5672)
        self.vhost = kwargs.get('vhost', '/')
        self.user = kwargs.get('user', 'guest')
        self.password = kwargs.get('password', 'guest')
        self.exchange = kwargs.get('exchange', '')
        self.exchange_type = kwargs.get('exchange_type', 'direct')
        self.tms_queue = kwargs.get('tms_queue', '')
        self.account_queue = kwargs.get('account_queue', '')
        self.routing_key = kwargs.get('routing_key', 'no-routing-key')

    def create_connection(self):
        """Create a connection.

        :return:
        """
        attempts = 0
        while True:
            attempts += 1
            try:
                self.connection = Connection(self.host, self.user,
                                             self.password)
                break
            except amqpstorm.AMQPError as why:
                LOGGER.exception(why)
                if self.max_retries and attempts > self.max_retries:
                    break
                time.sleep(min(attempts * 2, 30))
            except KeyboardInterrupt:
                break

    def pub_msg(self, message):
        """Start the Consumers.

        :return:
        """
        if not self.connection:
            self.create_connection()
        try:
            self._pub_msg(message)
        except amqpstorm.AMQPError as why:
            LOGGER.info('Start reconnect')
            LOGGER.exception(why)
            self.create_connection()
            self._pub_msg(message)

        except KeyboardInterrupt:
            self.connection.close()

    def _pub_msg(self, message):
        channel = self.connection.channel()
        # Message Properties.
        properties = {
            'content_type': 'text/plain',
            'delivery_mode': 2
        }

        # Create the message.
        print('Connection is:  ', self.connection, message)
        message = Message.create(channel, message, properties)
        message.publish(self.routing_key, exchange=self.exchange)
        channel.close()

    def setup(self):
        """
        Set up exchange and queue
        """
        self.create_connection()
        channel = self.connection.channel()
        channel.exchange.declare(
            exchange=self.exchange, exchange_type=self.exchange_type)
        channel.queue.declare(self.tms_queue, durable=True)
        channel.queue.declare(self.account_queue, durable=True)
        channel.queue.bind(
            queue=self.tms_queue,
            exchange=self.exchange,
            routing_key=self.routing_key)
        channel.queue.bind(
            queue=self.account_queue,
            exchange=self.exchange,
            routing_key=self.routing_key)
        channel.close()

My producer init in Django project settings.py

from service.py import Producer
MQ_PRODUCER = Producer(**kwargs)
MQ_PRODUCER.setup()

And in one Django app's tasks.py, I use the MQ_PRODUCER like this:

@app.task(**MQ_TASK_CONFIG)
def send_to_mq(msg):
    """
    Send message to mq
    """
    try:
        if settings.MQ_PRODUCER:
            try:
                settings.MQ_PRODUCER.pub_msg(msg)
            except Exception:
                raise Exception
            else:
                return 'success'

_20180428093124

When I start the Celery for the first time, the KeyError shows, after Producer recreate a connection, everything is fine. It takes about one minute to start create new connection.

Consuming multiple queues with one channel

I am porting code from pika to amqpstorm for thread safety. In the existing code there is a channel that is consuming multiple queues. With amqpstorm all messages go to the last consumed callback:

producer.py

from amqpstorm import Message, UriConnection


msg_properties = {
    'content_type': 'application/json',
    'delivery_mode': 2
}

connection = UriConnection(
    'amqp://guest:guest@localhost:5672/%2F?heartbeat=600'
)

channel = connection.channel()
channel.confirm_deliveries()

channel.exchange.declare(
    exchange='testing',
    exchange_type='direct',
    durable=True
)

Message.create(
    channel,
    'Test service event.',
    msg_properties
).publish(
    'service_event',
    exchange='testing',
)

Message.create(
    channel,
    'Test message event.',
    msg_properties
).publish(
    'message_event',
    exchange='testing',
)

channel.close()
connection.close()

consumer.py

from amqpstorm import UriConnection


connection = UriConnection(
    'amqp://guest:guest@localhost:5672/%2F?heartbeat=600'
)

channel = connection.channel()
channel.confirm_deliveries()


def service_event(message):
    message.ack()
    print('From service method.')


def message_event(message):
    message.ack()
    print('From message method.')


channel.exchange.declare(
    exchange='testing',
    exchange_type='direct',
    durable=True
)

s_queue = 'service_event.queue'
channel.queue.declare(s_queue)
channel.basic.consume(service_event, s_queue)
channel.queue.bind(
    exchange='testing',
    queue=s_queue,
    routing_key='service_event'
)

m_queue = 'message_event.queue'
channel.queue.declare(m_queue)
channel.basic.consume(message_event, m_queue)
channel.queue.bind(
    exchange='testing',
    queue=m_queue,
    routing_key='message_event'
)

try:
    channel.start_consuming()
except:
    channel.stop_consuming()

channel.close()
connection.close()

Yields:

From message method.
From message method.

But should be:

From service method.
From message method.

Am I doing something wrong here? It seems like one channel should be able to consume multiple queues into the correct callbacks?

Out of order ack-ing?

Whoooo, more wierdness!

.Thread-21 - INFO - Message channel: <amqpstorm.channel.Channel object at 0x7f5d08cdbcc8>
Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message properties: {'headers': None, 'reply_to': '', 'user_id': '', 'cluster_id': '', 'app_id': '', 'delivery_mode': None, 'content_type': '', 'correlation_id': 'keepalive', 'expiration': '', 'message_id': '', 'message_type': '', 'priority': None, 'content_encoding': 'utf-8', 'timestamp': None}
Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message _channel: <amqpstorm.channel.Channel object at 0x7f5d08cdbcc8>
Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message _method: {'routing_key': b'nak', 'redelivered': False, 'delivery_tag': 5, 'exchange': b'keepalive_exchange140037604406920', 'consumer_tag': b'amq.ctag-DCdsyFR66UJsU7sgkNp9oQ'}
ACK For delivery tag: 5
Main.Connector.Container(/rpcsys).Thread-21 - INFO - Heartbeat packet received! wat
Main.Connector.Internal(/rpcsys).Thread-23 - INFO - Timeout watcher loop. Current message counts: 0 (out: 0, in: 0)
Main.Connector.Container(/rpcsys).Thread-23 - INFO - Interface timeout thread. Ages: heartbeat -> 4.83, last message -> 32.51.
Main.Connector.Container(/rpcsys).Thread-21 - INFO - Received message!
Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message channel: <amqpstorm.channel.Channel object at 0x7f5d08cdbcc8>
Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message properties: {'headers': None, 'reply_to': '', 'user_id': '', 'cluster_id': '', 'app_id': '', 'delivery_mode': None, 'content_type': '', 'correlation_id': 'keepalive', 'expiration': '', 'message_id': '', 'message_type': '', 'priority': None, 'content_encoding': 'utf-8', 'timestamp': None}
Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message _channel: <amqpstorm.channel.Channel object at 0x7f5d08cdbcc8>
Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message _method: {'routing_key': b'nak', 'redelivered': False, 'delivery_tag': 6, 'exchange': b'keepalive_exchange140037604406920', 'consumer_tag': b'amq.ctag-DCdsyFR66UJsU7sgkNp9oQ'}
ACK For delivery tag: 6
Main.Connector.Container(/rpcsys).Thread-21 - INFO - Heartbeat packet received! wat
Main.Connector.Container(/rpcsys).Thread-21 - INFO - Received message!
Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message channel: <amqpstorm.channel.Channel object at 0x7f5d08cdbcc8>
Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message properties: {'headers': None, 'reply_to': '', 'user_id': '', 'cluster_id': '', 'app_id': '', 'delivery_mode': None, 'content_type': '', 'correlation_id': 'keepalive', 'expiration': '', 'message_id': '', 'message_type': '', 'priority': None, 'content_encoding': 'utf-8', 'timestamp': None}
Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message _channel: <amqpstorm.channel.Channel object at 0x7f5d08cdbcc8>
Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message _method: {'routing_key': b'nak', 'redelivered': True, 'delivery_tag': 7, 'exchange': b'keepalive_exchange140037604406920', 'consumer_tag': b'amq.ctag-DCdsyFR66UJsU7sgkNp9oQ'}
ACK For delivery tag: 7
Main.Connector.Container(/rpcsys).Thread-21 - INFO - Heartbeat packet received! wat
Main.Connector.Internal(/rpcsys).Thread-21 - ERROR - Error while in rx runloop!
Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -       Channel 1 was closed by remote server: PRECONDITION_FAILED - unknown delivery tag 6
Main.Connector.Internal(/rpcsys).Thread-21 - ERROR - Traceback (most recent call last):
Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -   File "/media/Storage/Scripts/ReadableWebProxy/FetchAgent/AmqpConnector/__init__.py", line 546, in _rx_poll
Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -     self.interface.process_rx_events()
Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -   File "/media/Storage/Scripts/ReadableWebProxy/FetchAgent/AmqpConnector/__init__.py", line 171, in process_rx_events
Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -     self.storm_channel.process_data_events(to_tuple=False)
Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -   File "/media/Storage/Scripts/ReadableWebProxy/amqpstorm/channel.py", line 266, in process_data_events
Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -     for message in self.build_inbound_messages(break_on_empty=True):
Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -   File "/media/Storage/Scripts/ReadableWebProxy/amqpstorm/channel.py", line 113, in build_inbound_messages
Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -     self.check_for_errors()
Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -   File "/media/Storage/Scripts/ReadableWebProxy/amqpstorm/channel.py", line 188, in check_for_errors
Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -     raise exception
Main.Connector.Internal(/rpcsys).Thread-21 - ERROR - amqpstorm.exception.AMQPChannelError: Channel 1 was closed by remote server: PRECONDITION_FAILED - unknown delivery tag 6
Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -
Main.Connector.Internal(/rpcsys).Thread-21 - WARNING - Should die flag! Runstate: running, threads live: threads alive, had exception: yes.
Main.Connector.Internal(/rpcsys).Thread-21 - INFO - RX Poll process dying. Threads_live: 1, had exception 1, should_die True
Main.Connector.Internal(/rpcsys).Thread-22 - WARNING - Should die flag! Runstate: running, threads live: threads alive, had exception: yes.
Main.Connector.Internal(/rpcsys).Thread-22 - WARNING - Should die flag! Runstate: running, threads live: threads alive, had exception: yes.
Main.Connector.Internal(/rpcsys).Thread-22 - INFO - TX Poll process dying (should die: True). Threads_live: 1, runstate: 1, resp queue size: 0, had exception 1.
Main.Connector.Internal(/rpcsys).Thread-23 - WARNING - Should die flag! Runstate: running, threads live: threads alive, had exception: yes.
Disconnecting!
Main.Connector.Container(/rpcsys).Thread-2 - INFO - Closing channel
Main.Connector.Container(/rpcsys).Thread-2 - INFO - Closing connection
Running state: True, thread alive: True, thread id:140037440014080
Joining _inbound_thread. Runstate: %s False

Context:

I have a connection with a thread processing the receiving messages. I have instrumented Message.ack() with a print statement that prints the delivery tag that it's acking.

It appears I'm calling [delivery tag 6].ack(), [delivery tag 7].ack(), and somehow the ack for delivery tag 7 is getting received by the rabbitmq server /first/, resulting in a PRECONDITION_FAILED error because acking 7 implicitly acks previous tags, and therefore 6 is not a valid delivery tag anymore.

I'm working on pulling out a testable example, but it's certainly odd.


Incidentally, the new docs pages are fancypants!

Exception: 'Deliver' object has no attribute 'body_size'

I have been experiencing a strange error, just for some messages I got this error:

'Deliver' object has no attribute 'body_size'
channel.py (line: 253)

I looked at self._inbound and I saw some Deliver objects in sequence instead of Deliver/ContentHeader/ContentBody

amqp-storm

The messages are being sent to the queue by another project in another language, so I have no clue why it is happening.

Sometimes raises RuntimeError("cannot join current thread")

Hi. I use your library (ver 1.2.7) in several projects and it works well. But some of this projects rarely prints exception traceback in a log:

Exception in thread amqpstorm.io:
Traceback (most recent call last):
  File "/home/user/.pyenv/versions/2.7.10/lib/python2.7/threading.py", line 810, in __bootstrap_inner
    self.run()
  File "/home/user/.pyenv/versions/2.7.10/lib/python2.7/threading.py", line 763, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/home/user/.pyenv/versions/2.7.10/lib/python2.7/site-packages/amqpstorm/io.py", line 221, in _process_incoming_data
    self.buffer = self.on_read(self.buffer)
  File "/home/user/.pyenv/versions/2.7.10/lib/python2.7/site-packages/amqpstorm/connection.py", line 246, in _read_buffer
    self._channel0.on_frame(frame_in)
  File "/home/user/.pyenv/versions/2.7.10/lib/python2.7/site-packages/amqpstorm/channel0.py", line 40, in on_frame
    self._write_frame(Heartbeat())
  File "/home/user/.pyenv/versions/2.7.10/lib/python2.7/site-packages/amqpstorm/channel0.py", line 104, in _write_frame
    self._connection.write_frame(0, frame_out)
  File "/home/user/.pyenv/versions/2.7.10/lib/python2.7/site-packages/amqpstorm/connection.py", line 177, in write_frame
    self.io.write_to_socket(frame_data)
  File "/home/user/.pyenv/versions/2.7.10/lib/python2.7/site-packages/amqpstorm/io.py", line 133, in write_to_socket
    self.on_error(why)
  File "/home/user/.pyenv/versions/2.7.10/lib/python2.7/site-packages/amqpstorm/connection.py", line 289, in _handle_socket_error
    self.close()
  File "/home/user/.pyenv/versions/2.7.10/lib/python2.7/site-packages/amqpstorm/connection.py", line 134, in close
    self.io.close()
  File "/home/user/.pyenv/versions/2.7.10/lib/python2.7/site-packages/amqpstorm/io.py", line 104, in close
    self.inbound_thread.join(timeout=1)
  File "/home/user/.pyenv/versions/2.7.10/lib/python2.7/threading.py", line 940, in join
    raise RuntimeError("cannot join current thread")
RuntimeError: cannot join current thread

I can't reproduce this error by hand but it still occurs. What are reasons of this behavior? Any suggestions? Thanks

Unhandled Frames in 3.6.2

Upgraded to RabbitMq 3.6.2 starting to see the following errors which I think lead to using all the memory on rabbitmq server.

[Channel%d] Unhandled Frame: %s -- %s

No code changes, just a server upgrade.
Not sure if this storm or rabbitmq.

Errors in log when closing connection

Hello,
when closing connection the following is printed to console:

Exception in thread amqpstorm.io:
traceback (most recent call last):
File "C:\python371\lib\threading.py", line 917, in _bootstrap_inner
self.run()
File "C:\python371\lib\threading.py", line 865, in run
self._target(*self._args, **self._kwargs)
File "C:\python371\lib\site-packages\amqpstorm\io.py", line 224, in _process_incoming_data
self.data_in += self._receive()
File "C:\python371\lib\site-packages\amqpstorm\io.py", line 239, in _receive
data_in = self._read_from_socket()
File "C:\python371\lib\site-packages\amqpstorm\io.py", line 254, in _read_from_socket
return self.socket.read(MAX_FRAME_SIZE)
File "C:\python371\lib\ssl.py", line 908, in read
raise ValueError("Read on closed or unwrapped SSL socket.")
ValueError: Read on closed or unwrapped SSL socket.`

Is this correct behaviour?
I attach the sample code

import amqpstorm
import sys
import time
import logging
import threading

rabbit_channel = None
rabbit_connection = None

logging.basicConfig(level = logging.DEBUG)

def on_message(message):
    """When server schedules a check for a client, the check payload is received in this callback"""
    print(message.body)
    
def setupChannel():
    """Sets up a rabbitmq channel. Ensure the subscription exchanges exist. Creates client's own exchange and receive queue.
        Creates necessary bindings. Sets a consumer class"""
    global rabbit_channel
    global rabbit_connection
    rabbit_channel = rabbit_connection.channel()

    client_name = "test"
    client_subscription = "test"
    
    #create susbcription exchange for this particular client
    rabbit_channel.exchange.declare(exchange=client_subscription, exchange_type='fanout', passive=False, durable=False, auto_delete=False)
    # Declare the queue to which the checks requests will be sent
    rabbit_channel.queue.declare(queue=client_name, durable=False, exclusive=False, auto_delete=True)
    rabbit_channel.queue.bind(client_name, client_subscription)
    #setup a consumer
    rabbit_channel.basic.consume(on_message, no_ack=True, exclusive = False,queue=client_name)
    rabbit_channel.start_consuming()
def setupConnection():
    """
    Creates a rabbitmq connection and connects
    """
    global rabbit_connection
    try:
        #we load that lib not from relative path - this will change after package restructure
        #if that is not urgently needed I would prefer this to stay here
        import amqpstorm  # pylint: disable=import-error
    except:
        print("Cannot import \"amqpstorm\" lib! Exit!")
        sys.exit(2)

    try:
        rabbit_connection = amqpstorm.Connection(
            hostname = "test",
            port = 5671,
            username = "test",
            password = "test",
            virtual_host="/test",
            ssl=True)
    except Exception as ex:
        message = "Exception on connection open: " + str(ex)
        print(message)
        raise ConnectionError(str(ex))

def closeConnection():
    """Closes rabbitmq connection and set connection variables to None"""
    try:
        global rabbit_channel
        global rabbit_connection
        if rabbit_channel:
            rabbit_channel.close()
        if rabbit_connection is not None:
            if not rabbit_connection.is_closed:
                print("Attempt to close transport connection")
                rabbit_connection.close()
    except Exception as ex:
        print("Exception on connection close. " + str(ex))
    finally:
        rabbit_connection = None
        rabbit_channel = None

setupConnection()
t = threading.Thread(target=setupChannel)
t.setDaemon(True)
t.start()

time.sleep(5)
rabbit_channel.close()
closeConnection()
t.join()
input()

Publish with confirmation may raise error even when message was successfully published

Description

Hi, I think I've found one problem in delivery confirmation - in _publish_confirm method. There are three following important (and probably buggy) lines:

self._channel.write_frames(frames_out)
result = self._channel.rpc.get_request(confirm_uuid, raw=True)
self._channel.check_for_errors()

The first line sends data to RMQ, second one tries to get status of this action and the third line checks for errors. Where is the bug? That many things may happen between the write_frames and delivery checks. I've checked briefly implementation of those two methods (rpc.get_request and check_for_errors) and it seems that there is some other network communication that may fail. Which results to delivery-failed response while the messages was actually published.
This may be really unpleasant for a system that e.g. retries in case of a failed publish and duplicate messages are undesirable.

How you can verify this:

I've verified my theory using docker RMQ, but you can of course use custom RMQ server and somehow interrupt internet connection, see below.

  • Run rabbitmq docker (rabbitmq:3-management, expose ports 5672 and 15672 and run tests to ensure that it works. I did run only some tests: amqpstorm/tests/functional/generic_tests.py, using nose)
  • Tests should pass so we have to hack source files to prove my theory.
  • Go to the mentioned _publish_confirm method and put time.sleep(30) (30, more or less, so you have enough time to disable connection) after write_frames, before delivery checks.
  • I ran only one test now - nosetests amqpstorm/tests/functional/generic_tests.py -m test_functional_publish_and_get_a_large_message. I also wrapped publish call in this test to try-except block so I could print the thrown error in except block. it was AMQPConnectionError("Connection was closed by remote server: CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'"
  • Now when test and source code is adjusted, you can run that one test, eventually open RMQ management in browser (you have to enable ports in docker) and wait a few seconds, the published message should appear there.
  • Now stop the running docker container. It has to be stopped before sleep, put to that publish method, expires.
  • And wait, when sleep finishes, it should print the error message.

I hope the description is clear. Otherwise, let me know, I'll try to answer your questions.

Definition of done

Error is raised only when a message was not delivered to the RMQ server.

Cant publish multiple messages

This is a weird one, I've upgraded to 2.2 and trying to do the following script

import os
import logging
logging.basicConfig(level=logging.DEBUG)

from amqpstorm import Message
from amqpstorm import UriConnection

keys = ["1","2","3"]
#signer = Signer()
bus = UriConnection("***")
with bus.channel(rpc_timeout=10) as channel:
    channel.confirm_deliveries()
    for key in keys:
        print key
        msg = "My Message"
        #properties = {"headers": {"md5-signature": signer.sign(msg)}}
        Message.create(channel, msg, properties).publish(key, exchange="amq.topic")

I get this error on the second publish.

DEBUG:amqpstorm.connection:Connection Opening
DEBUG:amqpstorm.channel0:Frame Received: Connection.Start
DEBUG:amqpstorm.channel0:Frame Sent: Connection.StartOk
DEBUG:amqpstorm.channel0:Frame Received: Connection.Tune
DEBUG:amqpstorm.channel0:Frame Sent: Connection.TuneOk
DEBUG:amqpstorm.channel0:Frame Sent: Connection.Open
DEBUG:amqpstorm.channel0:Frame Received: Connection.OpenOk
DEBUG:amqpstorm.heartbeat:Heartbeat Checker Started
DEBUG:amqpstorm.connection:Connection Opened
DEBUG:amqpstorm.connection:Opening a new Channel
DEBUG:amqpstorm.connection:Channel #1 Opened
1
2
DEBUG:amqpstorm.channel:Channel #1 Closing
DEBUG:amqpstorm.channel:Channel #1 Closed
Exception in thread amqpstorm.io:
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 801, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 754, in run
self.__target(*self.__args, **self.__kwargs)
File "/home/adam/.venv/local/lib/python2.7/site-packages/amqpstorm/io.py", line 219, in _process_incoming_data
if self.poller.is_ready:
File "/home/adam/.venv/local/lib/python2.7/site-packages/amqpstorm/io.py", line 48, in is_ready
except select.error as why:
AttributeError: 'NoneType' object has no attribute 'error'

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.