Giter Club home page Giter Club logo

amqpy's People

Contributors

aszc avatar veegee avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

amqpy's Issues

Connection.last_heartbeat_recv is timezone naive

When trying to detect connection failures on a connection, an application can look at the last_heartbeat_recv datetime(which comes from the underlying transport object) to determine if rabbitmq is still in communication. However, the object is timezone naive and when checking this object during DST transitions(on systems without DST disabled), there is no reliable way to determine how old the timestamp is.

Would it break any code if instead of

self.last_heartbeat_received = datetime.datetime.now()

it was changed to

self.last_heartbeat_received = datetime.datetime.now(tz=datetime.timezone.utc)

Presumably the last_heartbeat_sent date should also be in UTC as well.

missed messages on a channel..

I'm not sure how this is supposed to behave but I'm mostly sure something is wrong somewhere :
Basically, in a (quite simple) single threaded script, containing only the following :

import amqpy
from amqpy.consumer import AbstractConsumer
from amqpy.exceptions import Timeout
import time

exchange = 'test.x'
queue = 'test.q'

exchange = 'file.topic.x'
queue = 'log.file_upload.q'

routing_key = 'log.eventlog'

count = 0

class Consumer(AbstractConsumer):

    def run(self, msg):
        print(msg)
        global count
        count += 1
        msg.ack()

def main():

    conn = amqpy.Connection()
    channel = conn.channel()

    channel.basic_recover(requeue=True)

    channel.exchange_declare(exchange, 'topic',
                             auto_delete=False)

    channel.queue_declare(queue, auto_delete=False)
    channel.queue_bind(queue, exchange, routing_key)

    consumer = Consumer(channel, queue)
    consumer.declare()

    while True:

        # comment these 2 lines and no more problem
        ch = conn.channel()
        ch.close()

        try:
            conn.drain_events(0)
        except Timeout:
            time.sleep(1)


if __name__ == '__main__':
    try:
        main()
    finally:
        print("Tot count: %s" % count)

launch this script in a terminal,

launch another script to publish say 100 messages in a row on the given exchange/queue..

see the consumer receives some of the published messages BUT not all..

To check it :

once the publish script has finished and you wait some time to give the consumer time to process the 100 messages.. then stop/interrupt the consumer script, it will report "Tot count = .." with something != 100.
second verification: open a browser to your rabbitmq webadmin page, check the queue "unacked' number .. it's != 0 .. (at least in my situation)

that is: it's all as if our consumer should have received all messages (as said by rabbitmq itself), but some are not acked thus.
Problem is that on our consumer side we don't get all of them as if some were lost in between the server and our client..

if we comment the channel creation just after the while True then the consumer receives well all the messages..

any idea ?

threadlock with basic_get

When using multiple threads -- each with its own channel -- I run into a threadlock during calls to basic_get. The following code should quickly yield a threadlock. I suspect that it's because basic_get is using the channel's lock instead of a shared lock from the connection.

import logging
import threading
from threading import Thread

from amqpy import Connection

log = logging.getLogger('threadlock')
log.addHandler(logging.StreamHandler())
log.setLevel(logging.DEBUG)


def read(connection):
    thread_name = threading.current_thread().getName()
    exchange_id = 'amqp.topic'
    queue_id = 'test.queue.{}'.format(thread_name)
    routing_key = queue_id

    log.debug('Starting {}'.format(thread_name))

    channel = connection.channel()
    channel.exchange_declare(exchange_id, 'topic', durable=True, auto_delete=False)
    channel.queue_declare(queue_id)
    channel.queue_bind(queue_id, exchange=exchange_id, routing_key=routing_key)

    read_count = 0
    while True:
        messages = (channel.basic_get() for _ in range(1))
        for _ in messages:
            read_count += 1
            log.debug('{}::{}'.format(thread_name, read_count))


def main():
    connection = Connection()

    for _ in range(25):
        t = Thread(target=read, args=(connection,))
        t.start()

    while True:
        pass


if __name__ == '__main__':
    main()

Channel.basic_publish() vs Channel.basic_publish_confirm()

I don't see how useful is actually basic_publish_confirm(), except that it explicitly checks that the channel has been put in confirm mode..

But in my opinion this shouldn't be needed, if one wants to use channel confirm mode then he anyway has to call channel.confirm_select() and then he calls basic_publish() (which notices the channel is in confirm mode and wait for the ack as necessary) and voilà, that's all.

So if one wants to use the confirm mode but does not call confirm_select() then it's his fault.. and it's not to amqpy to provide a way (basic_publish_confirm) to verify that.

So in my opinion we could simply drop basic_publish_confirm(). Simpler, better ;)

wdyt ?

Support for RabbitMQ Clusters

Hi, I recently stumbled upon this library and replaced my kombu setup with it. So far it works great and is faster than kombu.

Now the only problem I'm having is that amqpy doesn't seem to support the amqp connection string (e.g.: amqp://<user>:<pass>@<host>:<port>) when establishing a connection. Moreover, it looks like there is no way of connecting to multiple servers in a cluster. The way I do it with kombu is by simply providing amqp://<user>:<pass>@<host1>:<port1>;amqp://<user>:<pass>@<host2>:<port2>;amqp://<user>:<pass>@<host3>:<port3> as the connection parameter.

Maybe I missed something in the code, you can tell me if I'm wrong. It's the only thing right now keeping me from using this library for production purpose, which is a shame considering how well it works.

adapt test for interrupt by signal for python3.5

https://docs.python.org/3.6/whatsnew/3.5.html#pep-475-retry-system-calls-failing-with-eintr

Note that the system call is only retried if the signal handler does not raise an exception.

So I think you need to raise some exception in the signal handler below :

(from: https://github.com/veegee/amqpy/blob/master/amqpy/tests/test_connection.py#L119)

def test_interrupted(self, conn):
        """Make sure to get InterruptedError if a read was interrupted
        """

        def sig_handler(sig, frame):
            pass

otherwise I think that test will fail in python3.5..

how mandatory works

Hi,

We would like to use the mandatory flag of the basic_publish channel method (to know if a message is well delivered to a queue), but once set to True I don't know what else I need to do ..

do you have some resources / example ? I looked in the code and doc but could not find how this works..

thx.

gst.

possible counter problem with consumer dedicated threads

in https://github.com/veegee/amqpy/blob/master/amqpy/consumer.py#L92

    def _run(self, msg):
        self.run(msg)
        self.consume_count += 1

    def start(self, msg):
        if self.use_thread:
            t = Thread(target=self._run, args=(msg,), name='consumer-{}'.format(self.consumer_tag))
            t.start()
        else:
            self._run(msg)

if the use_thread is activated, then the self.consume_count could lose some counts :

>>> counter = 0
>>> def r(N):
    global counter
    for _ in range(N):
        counter += 1
>>> import threading
>>> threads = [ threading.Thread(target=r, args=(100000,)) for i in range(10) ]
>>> x = [ t.start() for t in threads ]
>>> counter
921570  
>>>  # should have been 10 * 100000

Add more examples

Add more examples in examples directory and update Sphinx docs with better examples.

Improve efficiency in multithreaded use cases

See #29

Connection and channel operations currently lock the connection and wait until the operation is complete. This provides thread safety, but slightly reduces performance in highly concurrent code when multiple threads are operating on their own channels. A better solution would be to allow concurrent channel operations safely.

Message body are bytes..

When I publish some message through the rabbitmq admin interface (localhost:15672), the message body as received in my consumer is bytes..

While for messages I've published with a regular publisher the message body got in the consumer is text/str ..

is it expected ?

Documentation: "run" declaration missing "self"

Just getting started with amqpy, but looks good so far.

Running through the simple introductory scripts on rtd, I found that the callback for the Consumer object seems to need self. The definition is currently:

def run(msg: Message):
    print('Received a message: {}'.format(msg.body))
    msg.ack()

but, when run, this fails because run() is only expecting msg, but gets called as self.run(msg) in consumer.py's _run() function. I believe the definition of run should be changed to:

def run(self, msg: Message):
    print('Received a message: {}'.format(msg.body))
    msg.ack()

'buffer too small for requested bytes' during Connect()

What am I doing wrong here?

jackc@wellgoddamn ~ $ python3  
Python 3.4.0 (default, Apr 11 2014, 13:05:11)
[GCC 4.8.2] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from amqpy import Connection
>>> conn = Connection()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/lib/python3.4/dist-packages/amqpy/connection.py", line 131, in __init__
    self.connect()
  File "/usr/local/lib/python3.4/dist-packages/amqpy/connection.py", line 152, in connect
    self.wait(spec.Connection.Start)
  File "/usr/local/lib/python3.4/dist-packages/amqpy/abstract_channel.py", line 67, in wait
    m = self._wait_method([method])
  File "/usr/local/lib/python3.4/dist-packages/amqpy/abstract_channel.py", line 104, in _wait_method
    method = self.connection.method_reader.read_method()
  File "/usr/local/lib/python3.4/dist-packages/amqpy/method_io.py", line 167, in read_method
    return self._read_method()
  File "/usr/local/lib/python3.4/dist-packages/amqpy/concurrency.py", line 39, in wrapper
    retval = f(self, *args, **kwargs)
  File "/usr/local/lib/python3.4/dist-packages/amqpy/method_io.py", line 150, in _read_method
    raise method
  File "/usr/local/lib/python3.4/dist-packages/amqpy/method_io.py", line 66, in _next_method
    frame = self.transport.read_frame()
  File "/usr/local/lib/python3.4/dist-packages/amqpy/concurrency.py", line 39, in wrapper
    retval = f(self, *args, **kwargs)
  File "/usr/local/lib/python3.4/dist-packages/amqpy/transport.py", line 179, in read_frame
    payload = self.read(frame.payload_size)
  File "/usr/local/lib/python3.4/dist-packages/amqpy/transport.py", line 328, in read
    return self._read(n, initial, _errnos=(errno.EAGAIN, errno.EINTR))
  File "/usr/local/lib/python3.4/dist-packages/amqpy/transport.py", line 119, in _read
    bytes_read = self.sock.recv_into(mview, to_read)
ValueError: buffer too small for requested bytes
>>> 
root@wellgoddamn ~ # qpidd -v
qpidd (qpidc) version 0.16
root@wellgoddamn ~ # netstat -anp|grep qpidd
tcp        0      0 0.0.0.0:5672            0.0.0.0:*               LISTEN      1635/qpidd      
tcp6       0      0 :::5672                 :::*                    LISTEN      1635/qpidd      
unix  2      [ ]         DGRAM                    16134    1635/qpidd    

Packet log shows some chatter before the error:

root@wellgoddamn ~ # tcpdump -Ani lo port 5672
tcpdump: verbose output suppressed, use -v or -vv for full protocol decode
listening on lo, link-type EN10MB (Ethernet), capture size 65535 bytes
12:51:14.510843 IP 127.0.0.1.36665 > 127.0.0.1.5672: Flags [S], seq 2322166516, win 43690, options [mss 65495,sackOK,TS val 2808828482 ecr 0,nop,wscale 7], length 0
E..< .@[email protected].(.ir..........0.........
.kRB........
12:51:14.510863 IP 127.0.0.1.5672 > 127.0.0.1.36665: Flags [S.], seq 2217121944, ack 2322166517, win 43690, options [mss 65495,sackOK,TS val 2808828482 ecr 2808828482,nop,wscale 7], length 0
E..<..@.@.<..........(.9.&...ir......0.........
.kRB.kRB....
12:51:14.510874 IP 127.0.0.1.36665 > 127.0.0.1.5672: Flags [.], ack 1, win 342, options [nop,nop,TS val 2808828482 ecr 2808828482], length 0
E..4 .@[email protected].(.ir..&.....V.(.....
.kRB.kRB
12:51:14.510919 IP 127.0.0.1.36665 > 127.0.0.1.5672: Flags [P.], seq 1:9, ack 1, win 342, options [nop,nop,TS val 2808828482 ecr 2808828482], length 8
E..< .@[email protected].(.ir..&.....V.0.....
.kRB.kRBAMQP..  .
12:51:14.510935 IP 127.0.0.1.5672 > 127.0.0.1.36665: Flags [.], ack 9, win 342, options [nop,nop,TS val 2808828482 ecr 2808828482], length 0
E..4..@[email protected]..........(.9.&...ir....V.(.....
.kRB.kRB
12:51:14.511016 IP 127.0.0.1.5672 > 127.0.0.1.36665: Flags [P.], seq 1:9, ack 9, win 342, options [nop,nop,TS val 2808828482 ecr 2808828482], length 8
E..<..@[email protected]..........(.9.&...ir....V.0.....
.kRB.kRBAMQP...

12:51:14.511066 IP 127.0.0.1.5672 > 127.0.0.1.36665: Flags [F.], seq 9, ack 9, win 342, options [nop,nop,TS val 2808828482 ecr 2808828482], length 0
E..4..@[email protected]..........(.9.&...ir....V.(.....
.kRB.kRB
12:51:14.511107 IP 127.0.0.1.36665 > 127.0.0.1.5672: Flags [.], ack 9, win 342, options [nop,nop,TS val 2808828482 ecr 2808828482], length 0
E..4 .@.@..     .........9.(.ir..&.....V.(.....
.kRB.kRB
12:51:14.549965 IP 127.0.0.1.36665 > 127.0.0.1.5672: Flags [.], ack 10, win 342, options [nop,nop,TS val 2808828492 ecr 2808828482], length 0
E..4 .@[email protected].(.ir..&.....V.(.....
.kRL.kRB
^C
9 packets captured
18 packets received by filter
0 packets dropped by kernel

basic_get is non-blocking

I have currently the problem that I use a queue as task queue for a cluster and each consumer should only get one task at a time and simply wait/block if no tasks are available. A blocking basic_get would solve this problem and I could not come up with an other solution. According to http://www.rabbitmq.com/amqp-0-9-1-quickref.html the amqp method basic.get should be synchronous, probably to enable such use cases.

API documentation for AbstractConsumer

In the online docs, API documentation exists for almost all of the fundamental stuff except the AbstractConsumer class, which is subclassed in the usage examples.

After looking in the code, it seems the class already has docstrings in place. Is there a reason for why its docs are missing? (like API instability, or somesuch?)

mandatory and channel.returned_messages bad behavior ?

see : https://gist.github.com/gst/ad5dff15e2fe37e9b3dc

Basically:

(If I do correctly use the feature) it so seems that returned_messages is not always correctly handled..

running the gist can give you different results from one run to another, example :

bad multiple returned: ['0:1', '0:3']
bad some not_returned: ['0:0', '0:2', '0:4']

another:
bad some not_returned: ['0:0', '0:1', '0:2', '0:3', '0:4']

sometimes it succeed correctly..

But if re-create / instantiate the channel in my_publish, and call its close() instead of conn.loop(0), then it always always succeed (all messages are correctly returned only once) EDIT: damn even not... just checked with more threads and more loops and got :
bad multiple returned: ['0:58']
bad some not_returned: ['0:57']

Drain Events fails

I get the following dump after processing over 100,000 messages. In the 4 most recent crashes, it happens on message number 113,851, 209,748, 139,802, and 118,996. The last two were with the exact same data.

Traceback (most recent call last):
File "./mytestcode.py", line 64, in
conn.drain_events(timeout=None)
File "/usr/local/lib/python3.4/site-packages/amqpy/connection.py", line 296, in drain_events
method = self._wait_any(timeout)
File "/usr/local/lib/python3.4/site-packages/amqpy/connection.py", line 283, in _wait_any
method = self.method_reader.read_method(timeout)
File "/usr/local/lib/python3.4/site-packages/amqpy/method_io.py", line 175, in read_method
return self._read_method()
File "/usr/local/lib/python3.4/site-packages/amqpy/concurrency.py", line 39, in wrapper
retval = f(self, _args, *_kwargs)
File "/usr/local/lib/python3.4/site-packages/amqpy/method_io.py", line 162, in _read_method
.format('Read:', method.method_type, METHOD_NAME_MAP[method.method_type]))
KeyError: method_t(class_id=10, method_id=60)

Connection.connect() can't be reused

>>> con = amqpy.Connection()
>>> con.close()
>>> con.connect()
Traceback (most recent call last):
  File "<pyshell#6>", line 1, in <module>
    con.connect()
  File "/home/gstarck/work/public/python/amqpy/amqpy/connection.py", line 151, in connect
    self.wait(spec.Connection.Start)
  File "/home/gstarck/work/public/python/amqpy/amqpy/abstract_channel.py", line 67, in wait
    m = self._wait_method([method])
  File "/home/gstarck/work/public/python/amqpy/amqpy/abstract_channel.py", line 104, in _wait_method
    method = self.connection.method_reader.read_method()
AttributeError: 'NoneType' object has no attribute 'method_reader'
>>> 

should be corrected, I guess, it's all because of:

AbstractChannel.init, called from Connection.init ; but in the close() process, the Connection.connection attribute is reset to None.. when you call Connection.connect() then this attribute should also be reset correctly somehow ..

possible bad assumption with abstract_channel._wait_method()

there is in that function :
if allowed_methods is None

but I think that case can never happen.
The only code path where (I think) it could have occurred is from this same function, at its end :

            if ch_id == 0:
                self.connection.wait()

# with:

    def wait(self, method=None):
        """Wait for the specified method from the server

        :param method: method to wait for, or `None` to wait for any method
        :type method: spec.method_t or None
        """
        m = self._wait_method([method])
        return self.handle_method(m)

But as you see: in that case (method==None) then _wait_method() is so called with [None] ; not None ..

So in case of method is None here then _wait_method() should be called with None as well, not [None]..

Am I right ? / do you agree ?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.