Giter Club home page Giter Club logo

rabbitpy's Introduction

rabbitpy - RabbitMQ simplified

A pure python, thread-safe, minimalistic and Pythonic BSD Licensed AMQP/RabbitMQ library that supports Python 2.7+ and Python 3.4+. rabbitpy aims to provide a simple and easy to use API for interfacing with RabbitMQ, minimizing the programming overhead often found in other libraries.

Version Downloads Status Coverage License

Installation

rabbitpy may be installed via the Python package index with the tool of your choice. I prefer pip:

pip install rabbitpy

But there's always easy_install:

easy_install rabbitpy

Documentation

Documentation is available on ReadTheDocs.

Requirements

Version History

Available at https://rabbitpy.readthedocs.org/en/latest/history.html

rabbitpy's People

Contributors

boivie avatar brimcfadden avatar canardleteer avatar cenkalti avatar dave-shawley avatar dikshajoshi44 avatar fizyk avatar gmr avatar jelleaalbers avatar ju55i avatar kristaps avatar lsowen avatar mdbecker avatar sunbit avatar trojkat avatar vmarkovtsev avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rabbitpy's Issues

socket timed out

Hey,

I am testing this library on my local computer, and running it from my local RabbitMQ instance works fine, but trying to reach one of my servers hosted on the interweb gives me a socket timed out exception.

I used the exact code from the example, but changed only the host-name (also tried IP) and two different servers.

DEBUG:rabbitpy.base:Connection setting state to 'Opening'
DEBUG:rabbitpy.base:IO setting state to 'Opening'
DEBUG:rabbitpy.io:Connecting to lab-xxx:5672
Traceback (most recent call last):
  File "/home/eandersson/repo/x_x-master/xxx/horse.py", line 7, in <module>
    with rabbitpy.Connection('amqp://guest:guest@lab-xxx:5672/') as conn:
  File "/usr/local/lib/python2.7/dist-packages/rabbitpy-0.5.0p1-py2.7.egg/rabbitpy/connection.py", line 107, in __init__
    self._connect()
  File "/usr/local/lib/python2.7/dist-packages/rabbitpy-0.5.0p1-py2.7.egg/rabbitpy/connection.py", line 247, in _connect
    raise exception
socket.timeout: timed out

Both work fine with pika.

btw your socket send function, shouldn't it either be changed to sendall or slice the frame?

bytes_sent += self._write_frame_data(frame_data)

I haven't really had time yet to look into the code properly, but shouldn't it look something like this?

frame_data[bytes_sent:]

rabbitpy is slow

My test says that rabbitpy is 50 times slower than amqp or pika, with a simple RPC example.

One reason is the insane read buffer size. 16 Bytes? Seriously?

Another is busy-waiting. Python's Queue doesn't really support timeouts, it just fakes them by sleeping incrementally. The correct pattern is to alert the reader by queueing an exception value.

However, these are not the only problems. Unfortunately I don't have more time to go bug hunting.

Test code follows.

rabbitmq setup:

rabbitmqctl add_vhost test
rabbitmqctl add_user test test
rabbitmqctl set_permissions -p test test ".*"  ".*"  ".*"

Server:

#!/usr/bin/env python
import amqp

connection = amqp.connection.Connection(host='localhost', userid='test', password='test', login_method='AMQPLAIN', login_response=None, virtual_host='test')

channel = connection.channel()
channel.queue_declare(queue='rpc_queue',auto_delete=False)

import random,sys

def fib(n):
    if n == 0: return 0
    elif n == 1: return 1
    else: return fib(n-1) + fib(n-2)

def on_request(msg):
    body = msg.body
    props = msg.properties
    ch = msg.channel
    delivery_info = msg.delivery_info
    n = int(body)

    response = fib(n)

    msg = amqp.Message(body=str(response), correlation_id=props['correlation_id'])
    ch.basic_publish(msg=msg, exchange='', routing_key=props['reply_to'])
    ch.basic_ack(delivery_tag = delivery_info['delivery_tag'])
    #print "ACK",delivery_info['delivery_tag'],"for",props['correlation_id'],"to",props['reply_to'],"on",ch

channel.basic_qos(prefetch_count=1,prefetch_size=0,a_global=False)
channel.basic_consume(callback=on_request, queue='rpc_queue')
#channel.basic_recover(requeue=True) ## no effect

print " [x] Awaiting RPC requests"
while True: channel.wait()

amqp client:

#!/usr/bin/env python
import gevent; import gevent.monkey; gevent.monkey.patch_all()
from gevent.event import AsyncResult
import amqp
import uuid
import time

class FibonacciRpcClient(object):
    def __init__(self):
        self.connection = amqp.connection.Connection(host='localhost', userid='test', password='test', login_method='AMQPLAIN', login_response=None, virtual_host='test')

        self.channel = self.connection.channel()

        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.queue

        self.channel.basic_consume(callback=self.on_response, no_ack=True,
                                   queue=self.callback_queue)

    def on_response(self, msg):
        #ch, method, props, body):
        msgid = msg.properties['correlation_id']
        #print "RESP",msgid
        if self.corr_id == msgid:
            self.response.set(msg.body)

    def reader(self):
        while True:
            self.connection.drain_events()

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.response = AsyncResult()
        #print "WANT",self.corr_id
        msg = amqp.Message(reply_to = self.callback_queue, correlation_id = self.corr_id,
            body = str(n))

        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   msg=msg)
        return int(self.response.get())

fibonacci_rpc = FibonacciRpcClient()

j=gevent.spawn(fibonacci_rpc.reader)

#print " [x] Requesting fib(3)"
i=0
while i<1:#000:
    i += 1
    response = fibonacci_rpc.call(2)
    print " [.] Got %r" % (response,)
j.kill()

rabbitpy client:

#!/usr/bin/env python
import gevent; import gevent.monkey; gevent.monkey.patch_all()
from gevent.event import Event#AsyncResult
#import threading; Event=threading.Event
import rabbitpy
import uuid
import time

class FibonacciRpcClient(object):
    def __init__(self):
        self.connection = rabbitpy.connection.Connection("amqp://test:test@localhost:5672/test")

        self.channel = self.connection.channel()

        self.callback_queue = rabbitpy.Queue(self.channel)
        self.callback_queue.exclusive = True
        self.callback_queue.auto_delete = True
        self.callback_queue.declare()

        self.received = Event()

    def on_response(self, msg):
        #ch, method, props, body):
        msgid = msg.properties['correlation_id']
        if self.corr_id == msgid:
            self.response = msg.body
            self.received.set()

    def reader(self):
        time.sleep(1)
        i=0
        for msg in self.callback_queue.consume_messages():
            i += 1
            self.on_response(msg)
#            if i == 100: break

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        msg = rabbitpy.Message(self.channel,properties=dict(reply_to = self.callback_queue.name, correlation_id = self.corr_id),
            body_value = str(n))

        msg.publish(exchange='', routing_key='rpc_queue')
        self.received.wait()
        self.received.clear()
        return int(self.response)

fibonacci_rpc = FibonacciRpcClient()

j=gevent.spawn(fibonacci_rpc.reader)
#t=threading.Thread(target=fibonacci_rpc.reader)
#t.start()

print " [x] Requesting fib(3)"
i=0
while i<100:
    i += 1
    response = fibonacci_rpc.call(2)
    #print " [.] Got %r" % (response,)
print "DONE"
j.kill()

Note that the rabbitpy client only loops 100 times, not 1000 times. It still manages to take 7 seconds to complete, vs. 1 second (amqp with 1000 iterations), on my machine.

Version Bump

I've seen changes and a version bump to 0.26.0 but I haven't yet seen the release public. Was this intentional? I was hoping to get a couple of those changes.

IOLoop.run should catch select.error

I'm having error: (4, 'Interrupted system call') from io.py (IOLoop.run)
It seems in python 2.7 (and until 3.3 excluded) you must catch select.error to catch this error (see pep 3151)

Consider making IO threads daemonic

Right now IO threads will prevent an otherwise finished program from exiting, if some rabbitpy.Connection is left open. I know that the context manager usage of a connection would prevent this, but it feels clumsy for anything other than very short scripts.

Is there any particular reason why the IO threads are not daemonic? If not, maybe the IO threads could be made daemonic by default, or at least have an option to do so?

rabbitpy hangs after rabbitmq restart

Hi,

I've been trying to make rabbitpy restart consuming after a rabbitmq restart but there seem to be some problems:

A simple consumer:

while True:
    with rabbitpy.Connection(connection_parameters) as connection:
        with connection.channel() as channel:
            for message in rabbitpy.Queue(channel, 'queue_name'):
                    message.ack()
                    print(message.json())

After a rabbitmq server restart:

2014-10-22 11:50:09,487:0x1acfe90-io:Socket error: None
None
Exception in thread 0x1acfe90-io:
Traceback (most recent call last):
  File "/usr/lib64/python2.7/threading.py", line 811, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/site-packages/rabbitpy/io.py", line 317, in run
    self._loop.run()
  File "/usr/lib/python2.7/site-packages/rabbitpy/io.py", line 164, in run
    self._poll()
  File "/usr/lib/python2.7/site-packages/rabbitpy/io.py", line 200, in _poll
    self._data.error_callback(None)
  File "/usr/lib/python2.7/site-packages/rabbitpy/io.py", line 328, in on_error
    args = [self._args['host'], self._args['port'], exception[1]]
TypeError: 'NoneType' object has no attribute '__getitem__'

And the script hangs...
It looks like it is blocked in the channel close process.

The first workaround I found was to catch AMQPConnectionForced (because it seems to be the exception that is raised in rabbitpy when rabbitmq server restarts) and to mark the channel as closed manually.

while True:
    print 'yes'
    with rabbitpy.Connection(connection_parameters) as connection:
        with connection.channel() as channel:
            try:
                for message in rabbitpy.Queue(channel, 'heartbeat'):
                    message.ack()
                    print(message.json())
            except rabbitpy.exceptions.AMQPConnectionForced:
                print 'Catched AMQPConnectionForced, closing channel'
                channel._set_state(channel.CLOSED)

Actually, it works when the AMQPConnectionForced is raised, but sometimes it is not, so the script still hangs.

I'm not very good at python and thus I definitely could be wrong but I think io.py on_error function needs to be modified into something like this:

328,330c328,333
<         args = [self._args['host'], self._args['port'], exception[1]]
<         if self._channels[0][0].open:
<             self._exceptions.put(exceptions.ConnectionResetException(*args))

---
>         if exception:
>             args = [self._args['host'], self._args['port'], exception[1]]
>             if self._channels[0][0].open:
>                 self._exceptions.put(exceptions.ConnectionResetException(*args))
>             else:
>                 self._exceptions.put(exceptions.ConnectionException(*args))
332c335,336
<             self._exceptions.put(exceptions.ConnectionException(*args))

---
>             args = [self._args['host'], self._args['port'], None]
>             self._exceptions.put(exceptions.AMQPConnectionForced(*args))

Am I right or am I doing things bad ?

Thank you in advance for your responses.

Cheers,

Damien

Uncatchable exception

I have a processing loop with rabbitpy calls inside try-except block:

try:
    ...
    message = queue.get()�����������������������
    <processing_block>
    tx = rabbitpy.Tx(message.channel)
    tx.select()
    pub_message.publish('my-exchange', 'my-key')
    message.ack()
    tx.commit()�������������������������������������������������������������������������������������������������������������������������������������������������������������������������
    ...
except (rabbitpy.exceptions.ActionException, rabbitpy.exceptions.ConnectionResetException,
                    rabbitpy.exceptions.ConnectionException) as e:
                print "Error."

Recently I noticed that during execution in <processing_block> some exception rises that is not catched by try-except.

Exception in thread 0x3480ef0-io:
Traceback (most recent call last):
  File "C:\Python34\lib\threading.py", line 920, in _bootstrap_inner
    self.run()
  File "C:\Python34\lib\site-packages\rabbitpy\io.py", line 348, in run
    self._loop.run()
  File "C:\Python34\lib\site-packages\rabbitpy\io.py", line 190, in run
    self._poll()
  File "C:\Python34\lib\site-packages\rabbitpy\io.py", line 235, in _poll
    self._read()
  File "C:\Python34\lib\site-packages\rabbitpy\io.py", line 249, in _read
    self._data.read_callback(self._data.fd.recv(MAX_READ))
  File "C:\Python34\lib\site-packages\rabbitpy\io.py", line 386, in on_read
    self._channels[0][0].on_frame(value[1])
  File "C:\Python34\lib\site-packages\rabbitpy\channel0.py", line 114, in on_frame
    self.write_frame(heartbeat.Heartbeat())
  File "C:\Python34\lib\site-packages\rabbitpy\base.py", line 231, in write_frame
    self._check_for_exceptions()
  File "C:\Python34\lib\site-packages\rabbitpy\base.py", line 285, in _check_for_exceptions
    raise exception
rabbitpy.exceptions.ConnectionResetException: Connection was reset at socket level�

This makes execution hang in tx.select() method later. ������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������������

SSL validation parameter error

when using ssl_validation=required in the connection URL I get the following exception:

  File "$path/rabbitpy/connection.py", line 477, in _process_url
    'ssl_validation': self._get_ssl_validation(query_values),
  File "$path//python/rabbitpy/connection.py", line 343, in _get_ssl_validation
    return SSL_VERSION_MAP[validation]
KeyError: 'required'

replacing the line with "return SSL_CERT_MAP[validation]" fixes it

Marshalling chokes on unicode

Hi there,

when publish()ing a rabbitpy.Message that contains unicode, marshalling the message breaks:

with rabbitpy.Connection() as conn:
    with conn.channel() as channel:
        message = rabbitpy.Message(channel, u'unicode shot the marshal')
        message.publish('test-exchange', 'test-routing-key')

causes

Exception in thread 0x7ff03cafafd0-io:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 810, in __bootstrap_inner
    self.run()
  File "<...>/local/lib/python2.7/site-packages/rabbitpy/io.py", line 215, in run
    self._loop.run()
  File "<...>/local/lib/python2.7/site-packages/rabbitpy/io.py", line 60, in run
    self._poll()
  File "<...>/local/lib/python2.7/site-packages/rabbitpy/io.py", line 103, in _poll
    self._write()
  File "<...>/local/lib/python2.7/site-packages/rabbitpy/io.py", line 134, in _write
    self._write_frame(data[0], data[1])
  File "<...>/local/lib/python2.7/site-packages/rabbitpy/io.py", line 143, in _write_frame
    frame_data = frame.marshal(value, channel)
  File "<...>/local/lib/python2.7/site-packages/pamqp/frame.py", line 107, in marshal
    return _marshal_content_body_frame(frame_value, channel_id)
  File "<...>/local/lib/python2.7/site-packages/pamqp/frame.py", line 229, in _marshal_content_body_frame
    frame_value.marshal())
  File "<...>/local/lib/python2.7/site-packages/pamqp/frame.py", line 217, in _marshal
    return ''.join([header, payload, FRAME_END_CHAR])
UnicodeDecodeError: 'ascii' codec can't decode byte 0xce in position 0: ordinal not in range(128)

Marshalling a str works as expected. So does handling unicode strings under Python 3. Am I holding it wrong?

PS This rather seems to be an issue in pamqp; I'm posting it here since I found it simplest to reproduce using rabbitpy. Is that ok with you, or do you want me to move the ticket? Thx.

Is rabbitpy thread-safe?

I see that you are opening a seperate IO thread per connection and using a Queue for sending methods over the connection. Does this mean rabbitpy is thread-safe?

RemoteClosedException occurs from multiple threads

Exception like "NOT_ALLOWED - attempt to reuse consumer tag 'rabbitpy.1.139714802256464'"

Exception in thread Thread-101:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "rabbitpy_ios.py", line 28, in main
    for message in queue.consume_messages(False, 10):
  File "/usr/local/lib/python2.7/dist-packages/rabbitpy/amqp_queue.py", line 110, in consume_messages
    with self.consumer(no_ack, prefetch) as consumer:
  File "/usr/lib/python2.7/contextlib.py", line 17, in __enter__
    return self.gen.next()
  File "/usr/local/lib/python2.7/dist-packages/rabbitpy/amqp_queue.py", line 92, in consumer
    self.channel.prefetch_count(prefetch)
  File "/usr/local/lib/python2.7/dist-packages/rabbitpy/channel.py", line 178, in prefetch_count
    global_=all_channels))
  File "/usr/local/lib/python2.7/dist-packages/rabbitpy/base.py", line 181, in rpc
    return self._wait_on_frame(frame_value.valid_responses)
  File "/usr/local/lib/python2.7/dist-packages/rabbitpy/base.py", line 254, in _wait_on_frame
    value = self._read_from_queue()
  File "/usr/local/lib/python2.7/dist-packages/rabbitpy/base.py", line 207, in _read_from_queue
    self._check_for_exceptions()
  File "/usr/local/lib/python2.7/dist-packages/rabbitpy/base.py", line 199, in _check_for_exceptions
    raise exception
RemoteClosedException: (530, "NOT_ALLOWED - attempt to reuse consumer tag 'rabbitpy.1.139714802256464'")

ValueError when consuming messages from federated queue

I am using rabbitpy 0.15.1 with a RabbitMQ broker version 3.2.3 with federated queues configured.

When trying to consume a message that has been federated from another broker the consumer fails with a ValueError and the message "Unknown type 'l'". My guess is that it might have something to do with the structure of the "x-received-from" header.

Below is an example of what a message that generates the error looks like in the RabbitMQ management console:

message

ValueError when using rabbitpy.publish() simple method

It seems rabbitpy.publish() isn't working for me as I keep getting the following stack trace when trying to publish anything (from different programs altogether):

Exception in thread Thread-1:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner
    self.run()
  File "/usr/local/lib/python2.7/dist-packages/rabbitpy/io.py", line 309, in run
    self._write_frame(*value)
  File "/usr/local/lib/python2.7/dist-packages/rabbitpy/io.py", line 338, in _write_frame
    frame_data = frame.marshal(frame_value, channel_id)
  File "/usr/local/lib/python2.7/dist-packages/pamqp/frame.py", line 105, in marshal
    return _marshal_content_header_frame(frame_value, channel_id)
  File "/usr/local/lib/python2.7/dist-packages/pamqp/frame.py", line 241, in _marshal_content_header_frame
    frame_value.marshal())
  File "/usr/local/lib/python2.7/dist-packages/pamqp/header.py", line 99, in marshal
    self.body_size) + self.properties.marshal()
  File "/usr/local/lib/python2.7/dist-packages/pamqp/specification.py", line 272, in marshal
    property_value))
  File "/usr/local/lib/python2.7/dist-packages/pamqp/specification.py", line 256, in encode_property
    getattr(self.__class__, property_name))
  File "/usr/local/lib/python2.7/dist-packages/pamqp/codec/encode.py", line 287, in by_type
    return octet(value)
  File "/usr/local/lib/python2.7/dist-packages/pamqp/codec/encode.py", line 120, in octet
    raise ValueError("int type required")
ValueError: int type required

Am trying to publish using the following code:

rabbitpy.publish('amqp://user:pass@server:5672/%2f', exchange='myexchange', routing_key='myroutingkey', body='message', properties={'delivery_mode': '2'})

All of this on pre-0.7.0. From 0.7.0 onwards I just get a hang on program. No output until I Ctrl+C:

^CERROR:rabbitpy.connection:Connection context manager closed on exception
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/rabbitpy/simple.py", line 85, in publish
    msg.publish(exchange, routing_key or '')
  File "/usr/local/lib/python2.7/dist-packages/rabbitpy/channel.py", line 78, in __exit__
    self.close()
  File "/usr/local/lib/python2.7/dist-packages/rabbitpy/channel.py", line 86, in close
    self._wait_on_frame(specification.Channel.CloseOk)
  File "/usr/local/lib/python2.7/dist-packages/rabbitpy/base.py", line 210, in _wait_on_frame
    value = self._read_from_queue()
  File "/usr/local/lib/python2.7/dist-packages/rabbitpy/base.py", line 161, in _read_from_queue
    return self._read_queue.get(True, 0.01)
  File "/usr/lib/python2.7/Queue.py", line 177, in get
    self.not_empty.wait(remaining)
  File "/usr/lib/python2.7/threading.py", line 262, in wait
    _sleep(delay)
KeyboardInterrupt

Polling is too frequent when using queue.consume_messages()

After rewriting my code to use rabbitpy instead of Pika 0.9.8, I have been seeing a lot of polling of the CPUs. Although it's not necessarily putting load on the system itself, the fact that it polls the CPU so often is causing my Amazon instances to be throttled back by AWS automatically, for requesting too much of the ECUs.

For the record, it's the same behavior seen using Pika 0.9.9+.

I am actually looking for ideas on how to implement a consumer without running into this issue. Below is an excerpt of the strace over a few minutes when the queues were empty (our AMQP is populated in batches):

select(0, NULL, NULL, NULL, {0, 100000}) = 0 (Timeout)
select(0, NULL, NULL, NULL, {0, 100000}) = 0 (Timeout)
select(0, NULL, NULL, NULL, {0, 100000}) = 0 (Timeout)
select(0, NULL, NULL, NULL, {0, 100000}) = 0 (Timeout)
select(0, NULL, NULL, NULL, {0, 100000}) = 0 (Timeout)
select(0, NULL, NULL, NULL, {0, 100000}) = 0 (Timeout)
select(0, NULL, NULL, NULL, {0, 100000}) = 0 (Timeout)
select(0, NULL, NULL, NULL, {0, 100000}) = 0 (Timeout)
select(0, NULL, NULL, NULL, {0, 100000}) = 0 (Timeout)
select(0, NULL, NULL, NULL, {0, 100000}) = 0 (Timeout)
select(0, NULL, NULL, NULL, {0, 100000}) = 0 (Timeout)
select(0, NULL, NULL, NULL, {0, 100000}) = 0 (Timeout)
select(0, NULL, NULL, NULL, {0, 100000}) = 0 (Timeout)
select(0, NULL, NULL, NULL, {0, 100000}) = 0 (Timeout)
select(0, NULL, NULL, NULL, {0, 100000}) = 0 (Timeout)
select(0, NULL, NULL, NULL, {0, 100000}) = 0 (Timeout)
select(0, NULL, NULL, NULL, {0, 100000}) = 0 (Timeout)
select(0, NULL, NULL, NULL, {0, 100000}) = 0 (Timeout)
select(0, NULL, NULL, NULL, {0, 100000}) = 0 (Timeout)
select(0, NULL, NULL, NULL, {0, 100000}) = 0 (Timeout)
select(0, NULL, NULL, NULL, {0, 100000}) = 0 (Timeout)
select(0, NULL, NULL, NULL, {0, 100000}) = 0 (Timeout)
select(0, NULL, NULL, NULL, {0, 100000}) = 0 (Timeout)
select(0, NULL, NULL, NULL, {0, 100000}) = 0 (Timeout)
select(0, NULL, NULL, NULL, {0, 100000}) = 0 (Timeout)
select(0, NULL, NULL, NULL, {0, 100000}) = 0 (Timeout)
select(0, NULL, NULL, NULL, {0, 100000}) = 0 (Timeout)
select(0, NULL, NULL, NULL, {0, 100000}) = 0 (Timeout)
select(0, NULL, NULL, NULL, {0, 100000}) = 0 (Timeout)
select(0, NULL, NULL, NULL, {0, 100000}) = 0 (Timeout)
(...)

headers with foreign characters like 'ö' causes server internal errors but only if the body does not contain a foreign character

Every time I publish a message with a header containing foreign characters like 'åäö' using rabbitpythe RabbitMQ server drops the connection with a INTERNAL ERROR, but only if the message body does not contain a foreign character.

# Does not work
message1 = Message(channel, 'foo', {'headers': {'bar': 'baz ö'}})
message1.publish(exchange, '', True)

# Works
message2 = Message(channel, 'foo', {'headers': {'bar': 'baz'}})
message2.publish(exchange, '', True)

# Also works
message3 = Message(channel,'foo ö', {'headers': {'bar': 'baz ö'}})
message3.publish(exchange, '', True)

Traceback

  File "D:/Workspace/Eco/render-invoice/main.py", line 127, in render_invoice
    message2.publish(exchange, '', True)
  File "C:\Python34\lib\site-packages\rabbitpy\message.py", line 270, in publish
    response = self.channel._wait_for_confirmation()
  File "C:\Python34\lib\site-packages\rabbitpy\channel.py", line 394, in _wait_for_confirmation
    specification.Basic.Nack])
  File "C:\Python34\lib\site-packages\rabbitpy\base.py", line 307, in _wait_on_frame
    self._check_for_exceptions()
  File "C:\Python34\lib\site-packages\rabbitpy\base.py", line 218, in _check_for_exceptions
    raise exception
rabbitpy.exceptions.AMQPInternalError: b'INTERNAL_ERROR'

Rabbitpy crash if the server have frame_max set to 0

If the rabbitmq server have frame_max set to 0 then rabbitpy will crash at line 256

Exception in thread Thread-2:
Traceback (most recent call last):
[SNIP]
  File "C:\Python34\lib\site-packages\rabbitpy\message.py", line 256, in publish
    float(self.channel.maximum_frame_size)))
ZeroDivisionError: float division by zero

Query string is not parsed

Although the comments in connection.py says:

The URL format is as follows:       
    amqp[s]://username:password@host:port/virtual_host[?query string]

It seems to me that rabbitpy always fails to parse the query string:

>>> from rabbitpy import utils
>>> utils.urlparse("amqp://guest:guest@localhost:5672/%2F?heartbeat_interval=1")
ParseResult(scheme='amqp', netloc='guest:guest@localhost:5672', path='/%2F?heartbeat_interval=1', params='', query='', fragment='')

I think you missed the trick of replacing amqp with http in the URL (so that urlparse module would be happy to parse it) which was already used in pika.

Non-blocking socket operation throws exception and then exception handler throws exception

This occurred in windows with python 3.2. I am using rabbitpy 0.19.0. After this error occurs it doesn't seem possible to recover.

Traceback (most recent call last):
  File "C:\Python32\lib\site-packages\rabbitpy\io.py", line 145, in _write_frame
    self._data.fd.sendall(frame_data)
socket.error: [Errno 10035] A non-blocking socket operation could not be completed immediately
Exception in thread 0x3148310-io:
Traceback (most recent call last):
  File "C:\Python32\lib\site-packages\rabbitpy\io.py", line 145, in _write_frame
    self._data.fd.sendall(frame_data)
socket.error: [Errno 10035] A non-blocking socket operation could not be completed immediately

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Python32\lib\threading.py", line 740, in _bootstrap_inner
    self.run()
  File "C:\Python32\lib\site-packages\rabbitpy\io.py", line 215, in run
    self._loop.run()
  File "C:\Python32\lib\site-packages\rabbitpy\io.py", line 60, in run
    self._poll()
  File "C:\Python32\lib\site-packages\rabbitpy\io.py", line 103, in _poll
    self._write()
  File "C:\Python32\lib\site-packages\rabbitpy\io.py", line 134, in _write
    self._write_frame(data[0], data[1])
  File "C:\Python32\lib\site-packages\rabbitpy\io.py", line 150, in _write_frame
    self._data.error_callback(exception)
  File "C:\Python32\lib\site-packages\rabbitpy\io.py", line 226, in on_error
    args = [self._args['host'], self._args['port'], exception[1]]
TypeError: 'error' object does not support indexing

SSL ca_certs

using ssl_cacert triggers the follow exception in io.py

 File "$path/rabbitpy/io.py", line 473, in _create_socket
    return ssl.wrap_socket(**kwargs)
TypeError: wrap_socket() got an unexpected keyword argument 'cacerts'

replacing " 'ca_certs': 'ssl_cacert'}" in

    SSL_KWARGS = {'keyfile': 'ssl_key',
                  'certfile': 'ssl_cert',
                  'cert_reqs': 'ssl_validation',
                  'ssl_version': 'ssl_version',
                  'cacerts': 'ssl_cacert'}

fixes it.

Exception raised when exiting consumer generator

Traceback (most recent call last):
  File "consumer.py", line 39, in <module>
    print('Published response to %s' % message.properties['message_id'])
  File "/usr/local/lib/python2.7/dist-packages/rabbitpy/connection.py", line 126, in __exit__
    raise exc_type(exc_val)
ValueError: ('Unknown frame type to wait for: %r', <class 'pamqp.specification.Deliver'>)

Publish failing with large messages

I'm having issues when publishing large messages to an exchange. With rabbitpy I get this exception:
rabbitpy.exceptions.ConnectionResetException: ('127.0.0.1', 5672, 'Resource temporarily unavailable')

In the RabbitMQ log, I see this:
=INFO REPORT==== 19-May-2014::02:21:44 ===
accepted TCP connection on [::]:5672 from 127.0.0.1:39884

=INFO REPORT==== 19-May-2014::02:21:44 ===
starting TCP connection <0.25899.261> from 127.0.0.1:39884

=WARNING REPORT==== 19-May-2014::02:21:49 ===
exception on TCP connection <0.25899.261> from 127.0.0.1:39884
connection_closed_abruptly

=INFO REPORT==== 19-May-2014::02:21:49 ===
closing TCP connection <0.25899.261> from 127.0.0.1:39884

I tried resending the message using the Pika driver and it worked fine using that driver. I would consider switching over to Pika but I would prefer to use a thread-safe driver. I've been using rabbitpy for the past 2 months for a project but have experienced these problems with larger messages lately. The message I'm trying to send is around ~1mil characters long. I've been experiencing this error over the past few days and the longer it doesn't work, the larger this specific message gets since it retries every 15 minutes with new data attached to the end of it. Any help would be appreciated.

Headers keys are always raw bytes and never strings when running on python 3

This might be intentional but in that case it should probably be mentioned somewhere in the documentation.

Right now we have to remember to put a b before the string key every time we try to access a header value while running python 3.

message.properties.get('headers').get(b'foo').decode()

The same thing applies to the message.properties values. They are always raw bytes and never Strings even if the field description is short_str

The problem seems to be in pamqp decode.py:370, _maybe_utf8().

When _maybe_utf8 is called while running python 3 it will always return bytes and never str or unicode. Since _maybe_utf8 is used while decoding table keys it also causes the header keys to be bytes.

Can I stop consuming gracefully?

I am using following code to consume messages from a queue:

for message in queue:
    message.pprint(true)
    message.ack()
logger.info("worker has ended")  # cannot drop here

Is there a way to stop consuming messages (in another thread or a signal handler)?

What I want to do is to stop consuming messages when my program gets SIGUSR1 signal from OS, finish current task (if there is one), ACK last message then exit from program.

Connection are not thread-safe

Hello,

Based on https://rabbitpy.readthedocs.org/en/latest/threads.html, it seems that Connection are thread-safe but Channel aren't.
However, if the same connection is used to create several channel simultaneously by using several threads, some channels will eventually get the same channel id and a thread will block waiting for a frame already received by another thread:

#4 file '/usr/lib64/python2.6/threading.py', in 'wait'
#8 file '/usr/lib64/python2.6/Queue.py', in 'get'
#12 file '/usr/lib/python2.6/site-packages/rabbitpy/base.py', in '_read_from_queue'
#15 file '/usr/lib/python2.6/site-packages/rabbitpy/base.py', in '_wait_on_frame'
#19 file '/usr/lib/python2.6/site-packages/rabbitpy/channel.py', in 'open'
#22 file '/usr/lib/python2.6/site-packages/rabbitpy/connection.py', in 'channel'

For example, the code below may lead to blocked threads:

def publisher(connection, name):
    for index in range(0, MESSAGE_COUNT):
        with connection.channel() as channel:
            print('[%s] message %s' % (name, index))
            message = rabbitpy.Message(channel, '[%s] Message #%i' % (name, index))
            message.publish(EXCHANGE, ROUTING_KEY)

with rabbitpy.Connection(<>) as connection:
    with connection.channel() as channel:
        exchange = rabbitpy.Exchange(channel, EXCHANGE)
        exchange.declare()

    # Start the publisher thread
    publisher_threads = {}
    for i in xrange(5):
        publisher_threads[i] = threading.Thread(target=publisher, kwargs={'connection': connection, 'name': i})
        publisher_threads[i].start()

    for v in publisher_threads.values():
        v.join()

Reproducing this issue can be help by printing the channel id:

diff --git a/rabbitpy/connection.py b/rabbitpy/connection.py
index e5b97ce..7a7e18a 100644
--- a/rabbitpy/connection.py
+++ b/rabbitpy/connection.py
@@ -157,6 +157,7 @@ class Connection(base.StatefulObject):

         """
         channel_id = self._get_next_channel_id()
+        print('channel id = %s' % channel_id)
         channel_frames = queue.Queue()
         self._channels[channel_id] = channel.Channel(channel_id,
                                                      self._events,

busy-wait hampers ability to massively multithread client

I found rabbitpy while searching for a python AMQP library that can handle a multithreaded client. I really like rabbitpy's interface -- very easy to use. However, I found that it uses far more CPU in the idle state than I had anticipated.

I'm writing a massively multithreaded production load simulator, using rabbitmq to distribute jobs to workers (see lexelby/apiary if you're curious). I need thousands of simultaneous clients, all occasionally grabbing a message from the queue and then alternating between sleeping and sending requests to the system under test. I'm currently using 100 processes with 50 threads apiece. Trust me, Python and Linux are both up to this kind of workload :)

I found that after setting up my 5000 threads, each process was using about 15% of a CPU core just waiting for messages. This is due to the 0.1 second timeout in AMQPChannel._read_from_queue. Removing that timeout brings CPU usage down to near 0.

I'm guessing that the 0.1 second timeout is in there so that it can respond quickly to changes in self.closed and self.state -- perhaps we can come up with a way to wait on both the queue and a semaphore simultaneously to avoid having to busy-wait.

For that matter, in the event that one or more messages are in the queue that don't match the criteria, the thread will busily cycle through the messages in the queue (this seems to be by design). It's conceivable that two threads could repeatedly grab each other's message and spin forever, or at least for awhile. Perhaps some kind of short randomized sleep might work.

more documentation around thread-safety

After reading through the code, my understanding is that rabbitpy is only thread-safe provided that each thread has its own Channel. This makes perfect sense and matches other libraries' models, from what I've seen. I think it'd be useful to surface this requirement in the documentation, because two simultaneous publishes on the same channel in different threads would break the protocol.

Missing one parameter when raising RemoteClosedChannelException in channel.py

__repr__ function of RemoteClosedChannelException requires at least 3 arguments when raising exception:

class RemoteClosedChannelException(Exception):
    def __repr__(self):
        return 'Channel %i was closed by the remote server (%i): %s' % \
               (self.args[0], self.args[1], self.args[2])

However in function on_remote_close of rabbitpy.channel, only 2 arguments are passed:

    def on_remote_close(self, frame_value):
        """Invoked by rabbitpy.connection.Connection when a remote channel close
        is issued.

        :param frame_value: The Channel.Close method frame
        :type frame_value: pamqp.specification.Channel.Close

        """
        self._set_state(self.REMOTE_CLOSED)
        raise exceptions.RemoteClosedChannelException(frame_value.reply_code,
                                                      frame_value.reply_text)

I think the last line should be

raise exceptions.RemoteClosedChannelException(self._channel_id, 
                                                      frame_value.reply_code,
                                                      frame_value.reply_text)

heartbeat defaulting to None resulting in 'ValueError: int type required', et.al.

I'm trying to run the first example from the README:

rabbitpy.publish(
    'amqp://guest:guest@localhost:5672/%2f',
    exchange='test',
    routing_key='example',
    body='This is my test message'
)

This throws the following exception:

Traceback (most recent call last):
  File "./my_script.py", line 221, in <module>
    body='This is my test message',
  File "./env/lib/python2.6/site-packages/rabbitpy/simple.py", line 80, in publish
    with connection.Connection(uri) as conn:
  File "./env/lib/python2.6/site-packages/rabbitpy/connection.py", line 107, in __init__
    self._connect()
  File "./env/lib/python2.6/site-packages/rabbitpy/connection.py", line 244, in _connect
    raise exception
ValueError: int type required

This stacktrace wasn't too helpful, so I modified all the code that calls put on self._exceptions (channel0.py and io.py) so that it also pushes traceback.format_exc() onto the queue. Then I modified line 242 of connection.py so that it printed out the original stacktrace. This resulted in the following stacktrace:

Traceback (most recent call last):
  File "./env/lib/python2.6/site-packages/rabbitpy/io.py", line 211, in run
    self._run()
  File "./env/lib/python2.6/site-packages/rabbitpy/io.py", line 464, in _run
    self._loop.run()
  File "./env/lib/python2.6/site-packages/rabbitpy/io.py", line 60, in run
    self._poll()
  File "./env/lib/python2.6/site-packages/rabbitpy/io.py", line 109, in _poll
    self._write()
  File "./env/lib/python2.6/site-packages/rabbitpy/io.py", line 143, in _write
    self._write_frame(data[0], data[1])
  File "./env/lib/python2.6/site-packages/rabbitpy/io.py", line 157, in _write_frame
    frame_data = frame.marshal(value, channel)
  File "./env/lib/python2.6/site-packages/pamqp/frame.py", line 110, in marshal
    return _marshal_method_frame(frame_value, channel_id)
  File "./env/lib/python2.6/site-packages/pamqp/frame.py", line 262, in _marshal_method_frame
    frame_value.marshal())
  File "./env/lib/python2.6/site-packages/pamqp/specification.py", line 183, in marshal
    output.append(codec.encode.by_type(data_value, data_type))
  File "./env/lib/python2.6/site-packages/pamqp/codec/encode.py", line 305, in by_type
    return short_int(value)
  File "./env/lib/python2.6/site-packages/pamqp/codec/encode.py", line 145, in short_int
    raise ValueError("int type required")
ValueError: int type required

I added additional print statements to all the lines in this stack trace and I observed that the short_int function was being passed None. Furthermore, the None value was originating from the heartbeat value of the value variable in _ write _frame in io.py

I found two workarounds to this issue. The first was to explicitly set the heartbeat_interval in the connection uri as follows:

rabbitpy.publish(
    'amqp://guest:guest@localhost:5672/%2f?heartbeat_interval=0',
    exchange='test',
    routing_key='example',
    body='This is my test message'
)

The second was to modify connection.py so that heartbeat does not default to None.

Unfortunately, I ran into another exception after fixing the first one:

Traceback (most recent call last):
  File "./my_script.py", line 221, in <module>
    body='This is my test message',
  File "./env/lib/python2.6/site-packages/rabbitpy/simple.py", line 87, in publish
    msg.publish(exchange, routing_key or '')
  File "./env/lib/python2.6/site-packages/rabbitpy/connection.py", line 134, in __exit__
    self.close()
  File "./env/lib/python2.6/site-packages/rabbitpy/connection.py", line 173, in close
    self._shutdown_connection()
  File "./env/lib/python2.6/site-packages/rabbitpy/connection.py", line 472, in _shutdown_connection
    self._channel0.close()
  File "./env/lib/python2.6/site-packages/rabbitpy/channel0.py", line 58, in close
    self._write_frame(specification.Connection.Close())
  File "./env/lib/python2.6/site-packages/rabbitpy/base.py", line 301, in _write_frame
    raise exception
pamqp.specification.AMQPNotFound: <pamqp.specification.Channel.Close object at 0x101bd2350>

Simple publish() crashing

For reasons unknown, doing a rabbitpy.publish() is failing for me:

Exception in thread Thread-2:
Traceback (most recent call last):
 File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner
   self.run()
 File "/usr/local/lib/python2.7/dist-packages/rabbitpy/io.py", line 358, in run
   self._write_frame(*value)
 File "/usr/local/lib/python2.7/dist-packages/rabbitpy/io.py", line 388, in _write_frame
   frame_data = frame.marshal(frame_value, channel_id)
 File "/usr/local/lib/python2.7/dist-packages/pamqp/frame.py", line 107, in marshal
   return _marshal_content_body_frame(frame_value, channel_id)
 File "/usr/local/lib/python2.7/dist-packages/pamqp/frame.py", line 229, in _marshal_content_body_frame
   frame_value.marshal())
 File "/usr/local/lib/python2.7/dist-packages/pamqp/frame.py", line 217, in _marshal
   return ''.join([header, payload, FRAME_END_CHAR])
UnicodeDecodeError: 'ascii' codec can't decode byte 0xaf in position 6: ordinal not in range(128)

Command used:
rabbitpy.publish('amqp_uri', exchange='topic_exchange', routing_key='key', body=message, properties={'delivery_mode': 2})

This causes the program to hang (not exit). Only a kill on the PID quits the process.

Attempting to do a message.ack() causes an exception

I presume this has to do with trying to ack() a message that has already been acked. Can't quite figure out why.

Unhandled exception: 'module' object has no attribute 'AMQP_EXCEPTIONS'
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/rabbitpy/io.py", line 206, in run
    self._run()
  File "/usr/local/lib/python2.7/dist-packages/rabbitpy/io.py", line 454, in _run
    self._loop.run()
  File "/usr/local/lib/python2.7/dist-packages/rabbitpy/io.py", line 60, in run
    self._poll()
  File "/usr/local/lib/python2.7/dist-packages/rabbitpy/io.py", line 107, in _poll
    self._read()
  File "/usr/local/lib/python2.7/dist-packages/rabbitpy/io.py", line 122, in _read
    self._data.read_callback(self._data.fd.recv(MAX_READ))
  File "/usr/local/lib/python2.7/dist-packages/rabbitpy/io.py", line 274, in on_read
    self._remote_close_channel(value[0], value[1])
  File "/usr/local/lib/python2.7/dist-packages/rabbitpy/io.py", line 434, in _remote_close_channel
    self._channels[channel_id][0].on_remote_close(frame_value)
  File "/usr/local/lib/python2.7/dist-packages/rabbitpy/channel.py", line 155, in on_remote_close
    if value.reply_code in exceptions.AMQP_EXCEPTIONS:
AttributeError: 'module' object has no attribute 'AMQP_EXCEPTIONS'

If the message body is a memoryview rabbitpy crashes in utils.py

If the message body is a memoryview rabbitpy crashes at utils.py, line 22

# Works
message1 = Message(channel, io.BytesIO(b'test').getValue(), {'headers': {'bar': 'baz ö'}})
message1.publish(exchange, '')

# Does not work
message2 = Message(channel, io.BytesIO(b'test').getbuffer(), {'headers': {'bar': 'baz ö'}})
message2.publish(exchange, '')
  File "C:\Python34\lib\site-packages\rabbitpy\message.py", line 245, in publish
    payload = utils.maybe_utf8_encode(self.body)
  File "C:\Python34\lib\site-packages\rabbitpy\utils.py", line 22, in maybe_utf8_encode
    return bytes(value, 'utf-8')
TypeError: encoding or errors without a string argument

This worked in 0.22.0

RabbitMQ java client encodes byte array using 'x' as the field value which causes rabbitpy/pamqp to crash

We recently tried sending a few byte arrays using the message header instead of the body. Everything works fine using a java client but out rabbitpy/pamqp consumer crashes with a ValueError: Unknown type: b'x' error

Exception in thread 0x2b552f0-io:
Traceback (most recent call last):
  File "C:\Python34\lib\threading.py", line 921, in _bootstrap_inner
    self.run()
  File "C:\Python34\lib\site-packages\rabbitpy\io.py", line 348, in run
    self._loop.run()
  File "C:\Python34\lib\site-packages\rabbitpy\io.py", line 190, in run
    self._poll()
  File "C:\Python34\lib\site-packages\rabbitpy\io.py", line 235, in _poll
    self._read()
  File "C:\Python34\lib\site-packages\rabbitpy\io.py", line 249, in _read
    self._data.read_callback(self._data.fd.recv(MAX_READ))
  File "C:\Python34\lib\site-packages\rabbitpy\io.py", line 375, in on_read
    value = self._read_frame()
  File "C:\Python34\lib\site-packages\rabbitpy\io.py", line 532, in _read_frame
    self._buffer, chan_id, value = self._get_frame_from_str(self._buffer)
  File "C:\Python34\lib\site-packages\rabbitpy\io.py", line 507, in _get_frame_from_str
    byte_count, channel_id, frame_in = frame.unmarshal(value)
  File "C:\Python34\lib\site-packages\pamqp\frame.py", line 79, in unmarshal
    return byte_count, channel_id, _unmarshal_header_frame(frame_data)
  File "C:\Python34\lib\site-packages\pamqp\frame.py", line 172, in _unmarshal_header_frame
    content_header.unmarshal(frame_data)
  File "C:\Python34\lib\site-packages\pamqp\header.py", line 119, in unmarshal
    self.properties.unmarshal(flags, data[12 + offset:])
  File "C:\Python34\lib\site-packages\pamqp\specification.py", line 305, in unmarshal
    consumed, value = codec.decode.by_type(data, data_type)
  File "C:\Python34\lib\site-packages\pamqp\codec\decode3.py", line 344, in by_type
    return field_table(value)
  File "C:\Python34\lib\site-packages\pamqp\codec\decode3.py", line 251, in field_table
    consumed, result = _embedded_value(value[offset:])
  File "C:\Python34\lib\site-packages\pamqp\codec\decode3.py", line 304, in _embedded_value
    raise ValueError('Unknown type: %r' % value[:1])
ValueError: Unknown type: b'x'

The official RabbitMQ java client encodes byte arrays with a 'x' field value which rabbitpy/pamqp does not seem to handle.

To be honest, I suspect the java client is not following the AMQP 0.9.1 specification in this case. According to the specification, page 31 the only valid field values are

field-value = 't' boolean
     'b' short-short-int
     'B' short-short-uint
     'U' short-int
     'u' short-uint
     'I' long-int
     'i' long-uint
     'L' long-long-int
     'l' long-long-uint
     'f' float
     'd' double
     'D' decimal-value
     's' short-string
     'S' long-string
     'A' field-array
     'T' timestamp
     'F' field-table
     'V'

Should I create a patch or do you prefer to follow the official specification?

Not compatible with Python 3.3

Running the consumer.py or publisher.py examples with Python 3.3 throws the error below. I tried the exact same scripts with Python 2.7 and it worked fine.

Traceback (most recent call last):
  File "pub.py", line 6, in <module>
    with rabbitpy.Connection('amqp://guest:guest@localhost:5672/%2f') as conn:
  File "/usr/local/lib/python3.3/dist-packages/rabbitpy-0.15.0-py3.3.egg/rabbitpy/connection.py", line 110, in __init__
    self._connect()
  File "/usr/local/lib/python3.3/dist-packages/rabbitpy-0.15.0-py3.3.egg/rabbitpy/connection.py", line 231, in _connect
    self._channel0.start()
  File "/usr/local/lib/python3.3/dist-packages/rabbitpy-0.15.0-py3.3.egg/rabbitpy/channel0.py", line 113, in start
    self._write_protocol_header()
  File "/usr/local/lib/python3.3/dist-packages/rabbitpy-0.15.0-py3.3.egg/rabbitpy/channel0.py", line 242, in _write_protocol_header
    self._write_frame(header.ProtocolHeader())
  File "/usr/local/lib/python3.3/dist-packages/rabbitpy-0.15.0-py3.3.egg/rabbitpy/base.py", line 320, in _write_frame
    self._write_trigger.send('0')
TypeError: 'str' does not support the buffer interface

Message bodies sometimes gets mixed up when publisher confirms are enabled

I have a queue with messages with a json string body and a few headers.

When I enabled enable_publisher_confirms I suddenly got a lot of json.loads(message.body.decode()) errors. When I dumped the message contents I noticed that two message bodies have been concatenated.

When I removed enable_publisher_confirms the problem disappears.

with connection.channel() as channel:
    channel.enable_publisher_confirms()
    for message in Queue(channel, 'foo'):   
        body = message.body.decode()
        json.loads(body)

This is a short dev script so I never do message.ack(). Can that be what triggers this bug?

missing pamqp

self explanatory console dump:

(env)(master)edd@edd-mac:~/workspace> pip install rabbitpy
Downloading/unpacking rabbitpy
  Downloading rabbitpy-0.9.0.tar.gz
  Running setup.py egg_info for package rabbitpy
    Traceback (most recent call last):
      File "<string>", line 16, in <module>
      File "/Users/edd/workspace/env/build/rabbitpy/setup.py", line 4, in <module>
        from rabbitpy import __version__
      File "rabbitpy/__init__.py", line 3, in <module>
        from rabbitpy.connection import Connection
      File "rabbitpy/connection.py", line 16, in <module>
        from rabbitpy import base
      File "rabbitpy/base.py", line 10, in <module>
        from pamqp import specification
    ImportError: No module named pamqp
    Complete output from command python setup.py egg_info:
    Traceback (most recent call last):

  File "<string>", line 16, in <module>

  File "/Users/edd/workspace/env/build/rabbitpy/setup.py", line 4, in <module>

    from rabbitpy import __version__

  File "rabbitpy/__init__.py", line 3, in <module>

    from rabbitpy.connection import Connection

  File "rabbitpy/connection.py", line 16, in <module>

    from rabbitpy import base

  File "rabbitpy/base.py", line 10, in <module>

    from pamqp import specification

ImportError: No module named pamqp

----------------------------------------
Command python setup.py egg_info failed with error code 1 in /Users/edd/workspace/env/build/rabbitpy
Storing complete log in /var/folders/ws/v1_tybq94rvckz6fh5z76vq80000gn/T/tmpRM37li
(env)(master)edd@edd-mac:~/workspace> pip search paqmp
(env)(master)edd@edd-mac:~/workspace> pip install pamqp
Downloading/unpacking pamqp
  Downloading pamqp-1.2.1.tar.gz
  Running setup.py egg_info for package pamqp

Installing collected packages: pamqp
  Running setup.py install for pamqp

rabbitpy (still) failing with large messages

Seeing errors like:
('excited-hawkbit-1097.bigwig.lshift.net', 10000, 'Resource temporarily unavailable')

We're running rabbitpy 0.19. Large messages seem to be triggering the problem. Smallest message I've seen for which it fails is about 41k. We publish many thousands of smaller messages per hour with no problems.

Getting pamqp.specification.AMQPNotFound when exchange is not created.

I'm running the 1st example from the README:

rabbitpy.publish('amqp://guest:guest@localhost:5672/%2f',
                     exchange='test',
                     routing_key='example',
                     body='This is my test message')

And I'm getting this exception:

Traceback (most recent call last):
  File "test_rabbitpy.py", line 5, in <module>
    body='This is my test message')
  File "env/lib/python2.6/site-packages/rabbitpy/simple.py", line 87, in publish
    msg.publish(exchange, routing_key or '')
  File "env/lib/python2.6/site-packages/rabbitpy/connection.py", line 134, in __exit__
    self.close()
  File "env/lib/python2.6/site-packages/rabbitpy/connection.py", line 173, in close
    self._shutdown_connection()
  File "env/lib/python2.6/site-packages/rabbitpy/connection.py", line 468, in _shutdown_connection
    self._channel0.close()
  File "env/lib/python2.6/site-packages/rabbitpy/channel0.py", line 58, in close
    self._write_frame(specification.Connection.Close())
  File "env/lib/python2.6/site-packages/rabbitpy/base.py", line 309, in _write_frame
    raise exception
pamqp.specification.AMQPNotFound: <pamqp.specification.Channel.Close object at 0x110690fd0>

I'm running the example in a virtualenv with python version 2.6.8

The version of rabbitmq I'm using is RabbitMQ 2.8.4 on Erlang R14A

The version of pamqp is 1.3.0

I've observed that this only happens if the exchange (In this case test) doesn't already exist. If I create the exchange first, the exception isn't thrown and the message is properly delivered.

AttributeError: 'Parsed' object has no attribute 'port'`

Hey,

I tried out the latest master, and I am unable to open a connection on Python 2.7.4 (Linux Mint 15).

This is some basic testing code, and it worked before I upgraded from 0.9.

connection_string = 'amqp://{0}:{1}@{2}:{3}/%2f'.format(self.username,
                                                                self.password,
                                                                self.host,
                                                                self.port)

self.connection = rabbitpy.Connection(connection_string)

The error looks like this.

File "/usr/local/lib/python2.7/dist-packages/rabbitpy/connection.py", line 85, in __init__
    self._args = self._process_url(url or self.DEFAULT_URL)
  File "/usr/local/lib/python2.7/dist-packages/rabbitpy/connection.py", line 428, in _process_url
    port = parsed.port or (self.PORTS['amqps'] if parsed.scheme == 'amqps'
AttributeError: 'Parsed' object has no attribute 'port'

I assume that it is supposed to be listed in the namedtuple, or removed/modified from the connection.py code.

Parsed = collections.namedtuple('Parsed',
                                'scheme,netloc,url,params,query,fragment')

socket.socketpair doesn't exist in Windows

Sorry to be the guy who brings up Windows, but I have been experimenting replacing some Pika code on a client's Windows server with Rabbitpy code.

Lines 179-180 of io.py raise the following exception:

AttributeError: 'module' object has no attribute 'socketpair'

Sure enough, socket.socketpair really doesn't exist in Windows; from what I understand of the small paragraph in the Python docs, it's somewhat UNIX-specific.

Is there a way to implement this idiom of creating two connected sockets with more-compatible code?


Also, I want to mention that I can't find any place that specifically mentions compatibility for Rabbitpy. I'm not sure if you plan on supporting Windows or not.

Closing connection raises AMQPPreconditionFailed

Running the example program at https://gist.github.com/kristaps/9985184 and pressing Ctrl-C on both clients causes the client where Ctrl-C was pressed first to throw an exception after a while, the other client seems to complete normally.

The traceback:

^CGot KeyboardInterrupt
Traceback (most recent call last):
  File "insults.py", line 59, in <module>
    conn.close()
  File "/Users/kristaps/.virtualenvs/celery/lib/python2.7/site-packages/rabbitpy/connection.py", line 168, in close
    self._shutdown_connection()
  File "/Users/kristaps/.virtualenvs/celery/lib/python2.7/site-packages/rabbitpy/connection.py", line 466, in _shutdown_connection
    self._channel0.close()
  File "/Users/kristaps/.virtualenvs/celery/lib/python2.7/site-packages/rabbitpy/channel0.py", line 58, in close
    self._write_frame(specification.Connection.Close())
  File "/Users/kristaps/.virtualenvs/celery/lib/python2.7/site-packages/rabbitpy/base.py", line 326, in _write_frame
    raise exception
rabbitpy.exceptions.AMQPPreconditionFailed: <pamqp.specification.Channel.Close object at 0x10eba5750>

The example program should be run something like this:

python insults.py a b
python insults.py b a  # params from the first invocation reversed

I don't see anything obviously wrong in the code, am I missing something?

Client heartbeat is throwing incorrect ConnectionResetException

Hi,
Thank you for all your work on rabbitpy.
I think I have found a bug but I don't understand it fully.

First, it seems that a heartbeat is send by the server (v3.1.5) only if there is no exchanged message, so the current implementation will fail when a scenario like this occurs:

  • No message is exchanged for negotiated heartbeat interval => the server sends a heartbeat
  • rabbitpy answer and store it (Channel0._last_heartbeat)
  • Some message are regularly exchanged and the server doesn't send heartbeat
  • The heartbeat check is triggered and the heartbeat is too old since it was not refreshed: an ConnectionResetException is raised despite the connection is working perfectly

Moreover, in some case (but I don't manage to reproduce it everytime), when I launch a basic consumer, the ConnectionResetException is raised in the io-thread and not in the consumming thread leading the latter to hang forever (the corresponding io thread crashed).

Here is a stacktrace for the io thread in this case:

Exception in thread 0x7fb5a0015ad0-io:
Traceback (most recent call last):
  File "/usr/lib64/python2.6/threading.py", line 532, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.6/site-packages/rabbitpy/io.py", line 348, in run
    self._loop.run()
  File "/usr/lib/python2.6/site-packages/rabbitpy/io.py", line 190, in run
    self._poll()
  File "/usr/lib/python2.6/site-packages/rabbitpy/io.py", line 235, in _poll
    self._read()
  File "/usr/lib/python2.6/site-packages/rabbitpy/io.py", line 249, in _read
    self._data.read_callback(self._data.fd.recv(MAX_READ))
  File "/usr/lib/python2.6/site-packages/rabbitpy/io.py", line 386, in on_read
    self._channels[0][0].on_frame(value[1])
  File "/usr/lib/python2.6/site-packages/rabbitpy/channel0.py", line 110, in on_frame
    self._write_frame(heartbeat.Heartbeat())
  File "/usr/lib/python2.6/site-packages/rabbitpy/base.py", line 318, in _write_frame
    self._check_for_exceptions()
  File "/usr/lib/python2.6/site-packages/rabbitpy/base.py", line 218, in _check_for_exceptions
    raise exception
ConnectionResetException: No heartbeat in 1799.9976542 seconds

Here is the stacktrace for the running consuming thread:

Thread 4 (Thread 0x7fb5a4cde700 (LWP 56466)):
#2 <built-in function sleep>
#4 file '/usr/lib64/python2.6/threading.py', in 'wait'
#8 file '/usr/lib64/python2.6/Queue.py', in 'get'
#12 file '/usr/lib/python2.6/site-packages/rabbitpy/base.py', in '_read_from_queue'
#15 file '/usr/lib/python2.6/site-packages/rabbitpy/base.py', in '_wait_on_frame'
#19 file '/usr/lib/python2.6/site-packages/rabbitpy/channel.py', in '_consume_message'
#22 file '/usr/lib/python2.6/site-packages/rabbitpy/amqp_queue.py', in 'next_message'
#24 file '/usr/lib/python2.6/site-packages/rabbitpy/amqp_queue.py', in '__iter__'

I hope this will be enough for you to understand the problem, but if I can be of any help, I will be glad to help.

Getting stuck in connection.py infinite loop

This problem is somewhat niche and has been difficult for me to gather information about, but it has been causing a headache.

Summary and question about resolution:

A while loop in connection.Connection._connect has been seen to never exit due to behavior of other code through which Rabbitpy is making its connection. The connection fails silently, causing neither of the events which could leave the loop.

Are there plans to implement a connection attempt timeout with a reconnection strategy?

Further details:

I'm going to talk about our use of Paramiko, which you obviously shouldn't need to care about, but it establishes the problem less ambiguously, specifying as much as I know.

Due to technical limitations, we use SSH tunnels to connect some clients' machines & processes with our own servers. We often use Paramiko to establish the SSH tunnels in Python, which demonstrates such a thing in their file forward.py.

For reasons still unknown to me, sometimes Paramiko cannot create a connection properly; it creates a connection which seems successful, but is ephemeral. When this happens, it logs the detection of EOF almost instantly, which is the same condition for ending a successful connection in a normal fashion. This closes the channel inside of the tunnel. When Paramiko receives a message destined for this now-closed channel, it inexplicably ignores it.

Paramiko prematurely closing its channels may very well be a bug in its code, as I can only reproduce it on Windows (and could not on Linux nor OS X). However, I still feel like the Rabbitpy Connection object should realize that there is a problem with the current connection attempt if self._channel0.open is False after a (parameter) timeout.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.