polyconseil / aioamqp Goto Github PK
View Code? Open in Web Editor NEWAMQP implementation using asyncio
License: Other
AMQP implementation using asyncio
License: Other
If I look at the travis build history, I think there are some flacky tests, or some Tasks are not cleanly terminated.
Hi,
when I create a connection with a virtualhost specified, my connection is resetted by RabbitMQ.
It will happen with from_url
everytime, and with connect
if I specify /myvhost
as the vhost. It will not happen with connect
if I remove the leading slash.
I don't know what is the expected behaviour, if the leading slash must be removed by the user (or from_url
) when creating a new connection with connect
, or if it must be automatically stripped in the open
method of AmqpProtocol
.
https://travis-ci.org/Polyconseil/aioamqp/jobs/153019740
Travis switched from python 3.5.0 to 3.5.2 since the last master build and so this went unnoticed.
The upstream asyncio commit that broke the code is likely to be https://hg.python.org/cpython/rev/d1479e05ed0f. On our side, 1f5460a is most likely the guilty patch (though I don't think we want to revert that patch since the upstream commit was probably made for a very good reason).
Will look at it ASAP.
Thanks for writing and open sourcing this library. It's great to be able to work with AMQP and asyncio already.
I was curious if there were any current plans for supporting access to message headers. I have a messaging pattern that requires the consumer to interrogate the message headers before working with the body, and those attributes are currently unavailable with the current implementation.
Some libraries provide a "message" class that has accessing the content and headers of a message (txamqp is the library I have the most experience with), which might be an option.
Thanks again.
no_wait requests use a common mechanism to cancel the waiter/future if the requests raises an exception.
I was trying aioamqp (master) today under Python 3.4
I noticed some of the examples didn't finished well. Here's the associated stack trace.
python emit_log_topic.py
[x] Sent 'Hello World!'
Traceback (most recent call last):
File "emit_log_topic.py", line 38, in <module>
asyncio.get_event_loop().run_until_complete(exchange_routing())
File "/opt/local/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/asyncio/base_events.py", line 208, in run_until_complete
return future.result()
File "/opt/local/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/asyncio/futures.py", line 243, in result
raise self._exception
File "/opt/local/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/asyncio/tasks.py", line 283, in _step
result = next(coro)
File "emit_log_topic.py", line 35, in exchange_routing
yield from asyncio.wait_for(protocol.client_close(), timeout=10)
AttributeError: 'AmqpProtocol' object has no attribute 'client_close'
--- Logging error ---
Traceback (most recent call last):
Exception ignored in: <generator object run at 0x10c621360>
Traceback (most recent call last):
File "/Users/xordoquy/Documents/Devs/aioamqp/aioamqp/protocol.py", line 191, in run
File "/opt/local/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/logging/__init__.py", line 1287, in exception
File "/opt/local/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/logging/__init__.py", line 1280, in error
File "/opt/local/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/logging/__init__.py", line 1386, in _log
File "/opt/local/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/logging/__init__.py", line 1396, in handle
File "/opt/local/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/logging/__init__.py", line 1466, in callHandlers
File "/opt/local/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/logging/__init__.py", line 837, in handle
File "/opt/local/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/logging/__init__.py", line 961, in emit
File "/opt/local/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/logging/__init__.py", line 890, in handleError
File "/opt/local/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/traceback.py", line 169, in print_exception
File "/opt/local/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/traceback.py", line 153, in _format_exception_iter
File "/opt/local/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/traceback.py", line 18, in _format_list_iter
File "/opt/local/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/traceback.py", line 65, in _extract_tb_or_stack_iter
File "/Users/xordoquy/.virtualenvs/aioamqp/lib/python3.4/linecache.py", line 15, in getline
File "/Users/xordoquy/.virtualenvs/aioamqp/lib/python3.4/linecache.py", line 41, in getlines
File "/Users/xordoquy/.virtualenvs/aioamqp/lib/python3.4/linecache.py", line 126, in updatecache
File "/Users/xordoquy/.virtualenvs/aioamqp/lib/python3.4/tokenize.py", line 431, in open
AttributeError: 'module' object has no attribute 'open'
Is there a way to get the connection and channel names from the driver?
I want to be able to hit the API from rabbitmq and query for just the one connection/channel.
I was able to get a working consumer going using your examples, but I'm trying to figure out how to acknowledge messages from within a callback passed to basic_consume(). I have something that looks like this:
yield from channel.basic_consume(queue_name, callback=callback)
And then my co-routine looks roughly like this:
@asyncio.coroutine
def callback(body, envelope, properties):
loop = asyncio.get_event_loop()
loop.create_task(do_work(body))
It doesn't look like I can acknowledge the message from within the callback. When using other synchronous AMQP clients, such as pika, the channel is passed to the callback, and then I can do something like ch.basic_ack(). Is there another approach? Or some other reason why this isn't viable with an asynchronous model? I'm fairly new to asyncio.
Thanks very much for your work on this project, I've found it to be helpful.
We should refactor our test cases. A clean testcase should be reusable and it should keep the rabbit clean.
Currently, the connect() and from_url() methods have one point of customization for SSL connections in the form of a verify_ssl
argument. However, this misses the point as:
server_hostname
and ssl
argumentsssl
argument can be an ssl.SSLContext instance that can be customized further than just deciding whether or not server-side certificates should be verified.In order to support such options, and to be future-compatible with any new options introduced to customize ssl.SSLContext
instances, I suggest the following:
verify_ssl
argument. It appears to be a short-sighted attempt to deal with the new default SSL policy introduced with newer versions of Python.ssl
argument to accept ssl.SSLContext
instances, and forward such instances to asyncio.create_connection
.WARNING:aioamqp.protocol:Connection lost exc=BrokenPipeError(32, 'Broken pipe')
ERROR:asyncio:Exception in callback AmqpProtocol._heartbeat_timer_recv_timeout()
handle: <TimerHandle when=75311.867231919 AmqpProtocol._heartbeat_timer_recv_timeout()>
Traceback (most recent call last):
File "/usr/lib/python3.5/asyncio/events.py", line 125, in _run
self._callback(*self._args)
File "/usr/lib/python3.5/site-packages/aioamqp/protocol.py", line 318, in _heartbeat_timer_recv_timeout
self._stream_writer.close()
AttributeError: 'NoneType' object has no attribute 'close'
Steps to reproduce:
Delivery tags are server generated (and no guarantees are stated in the spec), at present aioamqp assumes that they will be incrementing by 1 from 1, which seems to hold (at least for rabbitmq).
However, the return method does not receive the delivery tag, so the asynchronous publishes that are currently allowed are not possible to implement with this.
Probably, strictly speaking, we shouldn't be guessing the delivery tag anyway.
The time between events depends sraight on hearbeat value
2016-10-06 11:35:55,628 | INFO | 14179 | aioamqp.protocol | 286 | Close connection
2016-10-06 11:36:10,999 | WARNING | 14179 | aioamqp.protocol | 100 | Connection lost exc=None
hi!
i have modified rpc server aand client and they are very slow:
Client - sends requests in infitite loop:
#!/usr/bin/env python
"""
RPC client, aioamqp implementation of RPC examples from RabbitMQ tutorial
"""
import asyncio
import uuid
import aioamqp
class FibonacciRpcClient(object):
def __init__(self):
self.transport = None
self.protocol = None
self.channel = None
self.callback_queue = None
self.waiter = asyncio.Event()
@asyncio.coroutine
def connect(self):
""" an `__init__` method can't be a coroutine"""
self.transport, self.protocol = yield from aioamqp.connect()
self.channel = yield from self.protocol.channel()
result = yield from self.channel.queue_declare(queue_name='', exclusive=True)
self.callback_queue = result['queue']
yield from self.channel.basic_consume(
self.on_response,
no_ack=True,
queue_name=self.callback_queue,
)
@asyncio.coroutine
def on_response(self, channel, body, envelope, properties):
if self.corr_id == properties.correlation_id:
self.response = body
self.waiter.set()
@asyncio.coroutine
def call(self, n):
if not self.protocol:
yield from self.connect()
self.response = None
self.waiter.clear()
self.corr_id = str(uuid.uuid4())
yield from self.channel.basic_publish(
payload=str(n),
exchange_name='',
routing_key='rpc_queue',
properties={
'reply_to': self.callback_queue,
'correlation_id': self.corr_id,
},
)
yield from self.waiter.wait()
# yield from self.protocol.close()
return self.response
@asyncio.coroutine
def rpc_client():
fibonacci_rpc = FibonacciRpcClient()
print(" [x] Requesting fib(30)")
while True:
pass
response = yield from fibonacci_rpc.call(30)
print(" [.] Got %r" % response)
asyncio.get_event_loop().run_until_complete(rpc_client())
Server - no calculations, just echo:
"""
RPC server, aioamqp implementation of RPC examples from RabbitMQ tutorial
"""
import asyncio
import aioamqp
@asyncio.coroutine
def on_request(channel, body, envelope, properties):
n = int(body)
print(" [.] fib(%s)" % n)
# response = fib(n)
response = n
yield from channel.basic_publish(
payload=str(response),
exchange_name='',
routing_key=properties.reply_to,
properties={
'correlation_id': properties.correlation_id,
},
)
yield from channel.basic_client_ack(delivery_tag=envelope.delivery_tag)
@asyncio.coroutine
def rpc_server():
transport, protocol = yield from aioamqp.connect()
channel = yield from protocol.channel()
yield from channel.queue_declare(queue_name='rpc_queue')
yield from channel.basic_qos(prefetch_count=1, prefetch_size=0, connection_global=False)
yield from channel.basic_consume(on_request, queue_name='rpc_queue')
print(" [x] Awaiting RPC requests")
event_loop = asyncio.get_event_loop()
event_loop.run_until_complete(rpc_server())
event_loop.run_forever()
And i have only 25 requests per second
Why so slow? why exactly 25?
Thanks
INFO 2016-09-30 16:33:48,614 asyncio poll 2994.933 ms took 2998.075 ms: timeout
WARNING 2016-09-30 16:33:48,615 aioamqp.protocol Connection lost exc=None
INFO 2016-09-30 16:33:48,621 aioamqp.protocol Close connection
WARNING 2016-09-30 16:33:48,630 asyncio socket.send() raised exception.
WARNING 2016-09-30 16:33:48,630 asyncio socket.send() raised exception.
WARNING 2016-09-30 16:33:48,631 asyncio socket.send() raised exception.
[...]
The patch for #111 actually made things much worse. This is thoroughly embarrassing.
When server closed connection unexpectedly, my script (consumer) was working and do nothing (run_forever
).
And all what I can do: run some function after 'basic_consume', which would be periodically check connection state. I think it wrong way for async scripts. :)
Traceback (most recent call last):
File "/usr/local/lib/python3.5/site-packages/aioamqp/protocol.py", line 255, in run
yield from self.dispatch_frame()
File "/usr/local/lib/python3.5/site-packages/aioamqp/protocol.py", line 210, in dispatch_frame
yield from self.channels[frame.channel].dispatch_frame(frame)
File "/usr/local/lib/python3.5/site-packages/aioamqp/channel.py", line 110, in dispatch_frame
yield from methods[(frame.class_id, frame.method_id)](frame)
File "/usr/local/lib/python3.5/site-packages/aioamqp/channel.py", line 718, in basic_deliver
content_body_frame = yield from self.protocol.get_frame()
File "/usr/local/lib/python3.5/site-packages/aioamqp/protocol.py", line 183, in get_frame
yield from frame.read_frame()
File "/usr/local/lib/python3.5/site-packages/aioamqp/frame.py", line 434, in read_frame
payload_data = yield from self.reader.readexactly(self.frame_length)
File "/usr/local/lib/python3.5/asyncio/streams.py", line 509, in readexactly
block = yield from self.read(n)
File "/usr/local/lib/python3.5/asyncio/streams.py", line 482, in read
yield from self._wait_for_data('read')
File "/usr/local/lib/python3.5/asyncio/streams.py", line 423, in _wait_for_data
yield from self._waiter
File "/usr/local/lib/python3.5/asyncio/futures.py", line 358, in __iter__
yield self # This tells Task to wait for completion.
File "/usr/local/lib/python3.5/asyncio/tasks.py", line 290, in _wakeup
future.result()
File "/usr/local/lib/python3.5/asyncio/futures.py", line 274, in result
raise self._exception
File "/usr/local/lib/python3.5/asyncio/selector_events.py", line 702, in write
n = self._sock.send(data)
ConnectionResetError: [Errno 104] Connection reset by peer
2016-02-12 19:07:02 71897a856041 aioamqp.protocol[1] INFO Close connection
As option we can add some callback/handler in Exception
case
protocol.py
@asyncio.coroutine
def run(self):
while not self.stop_now.done():
try:
yield from self.dispatch_frame()
except exceptions.AmqpClosedConnection as exc:
logger.info("Close connection")
self.stop_now.set_result(None)
self._close_channels(exception=exc)
except Exception:
logger.exception('error on dispatch')
I'm trying to set up my code to be able to consume from multiple queues concurrently, using one channel.
Apart from connecting and creating the channel, this is the relevant part of the code:
tasks = []
for u in users:
tasks.append(channel.basic_consume(callback, queue_name=u.name))
await asyncio.gather(*tasks)
Unfortunately, when I run it, I receive the following Waiter already exists
error:
File "manage.py", line 10, in <module>
execute_from_command_line(sys.argv)
File "/home/app/.local/lib/python3.5/site-packages/django/core/management/__init__.py", line 367, in execute_from_command_line
utility.execute()
File "/home/app/.local/lib/python3.5/site-packages/django/core/management/__init__.py", line 359, in execute
self.fetch_command(subcommand).run_from_argv(self.argv)
File "/home/app/.local/lib/python3.5/site-packages/django/core/management/base.py", line 305, in run_from_argv
self.execute(*args, **cmd_options)
File "/home/app/.local/lib/python3.5/site-packages/django/core/management/base.py", line 356, in execute
output = self.handle(*args, **options)
File "/code/callgen/management/commands/filldata.py", line 79, in handle
loop.run_until_complete(self.connect(loop))
File "/usr/local/lib/python3.5/asyncio/base_events.py", line 387, in run_until_complete
return future.result()
File "/usr/local/lib/python3.5/asyncio/futures.py", line 274, in result
raise self._exception
File "/usr/local/lib/python3.5/asyncio/tasks.py", line 241, in _step
result = coro.throw(exc)
File "/code/callgen/management/commands/filldata.py", line 75, in connect
await asyncio.gather(*tasks)
File "/usr/local/lib/python3.5/asyncio/futures.py", line 361, in __iter__
yield self # This tells Task to wait for completion.
File "/usr/local/lib/python3.5/asyncio/tasks.py", line 296, in _wakeup
future.result()
File "/usr/local/lib/python3.5/asyncio/futures.py", line 274, in result
raise self._exception
File "/usr/local/lib/python3.5/asyncio/tasks.py", line 239, in _step
result = coro.send(None)
File "/home/app/.local/lib/python3.5/site-packages/aioamqp/channel.py", line 617, in basic_consume
'basic_consume', frame, request, no_wait, timeout=timeout)
File "/home/app/.local/lib/python3.5/site-packages/aioamqp/channel.py", line 196, in _write_frame_awaiting_response
f = self._set_waiter(waiter_id)
File "/home/app/.local/lib/python3.5/site-packages/aioamqp/channel.py", line 41, in _set_waiter
raise exceptions.SynchronizationError("Waiter already exists")
aioamqp.exceptions.SynchronizationError: Waiter already exists
Is there a way to consume from multiple queues using one channel?
Thanks!
When the content of a body frame is bigger than the supported size of the server, send it will result in a connection resetted by peer and the following error message :
aioamqp.protocol: WARNING: Server closed connection: FRAME_ERROR - type 3, all octets = <<>>: {frame_too_large,1000000,131064}, code=501, class_id=0, method_id=0
Trying to receive it will only process the first body frame of the message.
There is currently no implementation of the amqp's heartbeat frame.
Hello,
How to correct handle event and reconnect to the server? (I don't see callbacks on_connect/on_disconnect/etc like this.
With a single connection and a single channel I am unable to handle more than one simultaneous message.
Am I missing something?
If I create multiple connections, I'm able to simultaneously consume one message per connection (thus, 10 connections give 10 simultaneously handled messages). This also confirms that my handler code is not blocking asyncio from that.
Hi,
according to the documentation https://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.publish basic_publish method have to return not-found error code if the exchange doesn't exist, but it returns nothing and doesn't raise any exception:
await channel.basic_publish(
payload=pickle.dumps(request),
exchange_name='non_existing_exc',
routing_key=q_name)
is it an issue or it have to work the such way ? and also is there any way how I could to check if the exchange or queue exists ?
Thanks
We need to add a few client properties when the client connects to the server.
For the moment, properties are hardcoded:
https://github.com/Polyconseil/aioamqp/blob/master/aioamqp/protocol.py#L107
I'm using aioamqp 0.7 in python 3.5 envioronment.
About 180 seconds after I called basic_consume
await self.channel.basic_consume(queue_name='worker_heartbeat', callback=self.handle_heartbeat, no_ack=True)
there are error message in my terminal
[2016-05-07 18:39:39,461] asyncio:ERROR: Task exception was never retrieved
future: <Task finished coro=<disconnected() done, defined at /home/vagrant/Project/foo/bar/core/mq_connection.py:31> exception=AmqpClosedConnection()>
Traceback (most recent call last):
File "/home/vagrant/venv/foo/lib/python3.5/site-packages/aioamqp/frame.py", line 413, in read_frame
data = yield from self.reader.readexactly(7)
File "/usr/lib/python3.5/asyncio/streams.py", line 659, in readexactly
block = yield from self.read(n)
File "/usr/lib/python3.5/asyncio/streams.py", line 617, in read
yield from self._wait_for_data('read')
File "/usr/lib/python3.5/asyncio/streams.py", line 451, in _wait_for_data
yield from self._waiter
File "/usr/lib/python3.5/asyncio/futures.py", line 361, in __iter__
yield self # This tells Task to wait for completion.
File "/usr/lib/python3.5/asyncio/tasks.py", line 297, in _wakeup
future.result()
File "/usr/lib/python3.5/asyncio/futures.py", line 274, in result
raise self._exception
File "/usr/lib/python3.5/asyncio/tasks.py", line 240, in _step
result = coro.send(None)
File "/home/vagrant/Project/foo/bar/core/mq_connection.py", line 37, in disconnected
raise exception
File "/usr/lib/python3.5/asyncio/selector_events.py", line 663, in _read_ready
data = self._sock.recv(self.max_size)
ConnectionResetError: [Errno 104] Connection reset by peer
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/lib/python3.5/asyncio/tasks.py", line 240, in _step
result = coro.send(None)
File "/home/vagrant/Project/foo/bar/core/mq_connection.py", line 37, in disconnected
raise exception
File "/home/vagrant/venv/foo/lib/python3.5/site-packages/aioamqp/protocol.py", line 262, in run
yield from self.dispatch_frame()
File "/home/vagrant/venv/foo/lib/python3.5/site-packages/aioamqp/protocol.py", line 204, in dispatch_frame
frame = yield from self.get_frame()
File "/home/vagrant/venv/foo/lib/python3.5/site-packages/aioamqp/protocol.py", line 189, in get_frame
yield from frame.read_frame()
File "/home/vagrant/venv/foo/lib/python3.5/site-packages/aioamqp/frame.py", line 415, in read_frame
raise exceptions.AmqpClosedConnection() from ex
aioamqp.exceptions.AmqpClosedConnection
At the same time I got error message in rabbitMQ log like:
=INFO REPORT==== 7-May-2016::18:36:39 ===
accepting AMQP connection <0.5469.0> (192.168.50.4:53416 -> 192.168.50.4:5672)
=ERROR REPORT==== 7-May-2016::18:39:39 ===
closing AMQP connection <0.5469.0> (192.168.50.4:53416 -> 192.168.50.4:5672):
Missed heartbeats from client, timeout: 60s
It seems that after 3 missing heartbeat, the rabbitMQ server close the connection.
And I notice that if nothing is published in the queue I consume, the connection will not be closed.
But in the rest part of my project, I use the same way to create channel, and consume the message, nothing seems to be wrong.
My question is:
please forgive my poor English.
Steps to reproduce on aioamqp 0.8.2:
Result:
self.server_heartbeat = decoder.read_short()
self._heartbeat_timer_recv_reset()
unconditionally on line 108self.server_heartbeat is None
(which is never true, it's 0)Here's a traceback. Because connection.open-ok is the last frame the broker sends on establishing the connection, my code got to calling AmqpProtocol.channel() before getting an exception:
Traceback (most recent call last):
File "reproduce.py", line 123, in run
channel = yield from connection.channel()
File "/usr/lib/python3/dist-packages/aioamqp/protocol.py", line 444, in channel
yield from channel.open()
File "/usr/lib/python3/dist-packages/aioamqp/channel.py", line 134, in open
yield from fut
File "/usr/lib/python3.4/asyncio/futures.py", line 388, in __iter__
yield self # This tells Task to wait for completion.
File "/usr/lib/python3.4/asyncio/tasks.py", line 286, in _wakeup
value = future.result()
File "/usr/lib/python3.4/asyncio/futures.py", line 277, in result
raise self._exception
aioamqp.exceptions.ChannelClosed: (None, None)
I use on_error
parameter of aioamqp.connect
to pass handler, which reestablish connection when it is accidentally closed. But it is called even after the channel is "legally" closed, which definitely is not an error case. And its exception
argument looks weird: ChannelClosed(ChannelClosed(None, None), 'Channel is closed')
.
Here is the code to reproduce the issue:
import logging
import asyncio
import aioamqp
logger = logging.getLogger(__name__)
def on_error(exc):
logger.error('Handle error %r', exc)
async def run(loop):
transport, protocol = await aioamqp.connect(on_error=on_error, loop=loop)
channel = await protocol.channel()
logger.info('Close channel')
await channel.close()
logger.info('Close protocol')
await protocol.close()
logger.info('Close transport')
transport.close()
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
loop = asyncio.get_event_loop()
loop.run_until_complete(run(loop=loop))
Here is its output:
DEBUG:asyncio:Using selector: EpollSelector
INFO:aioamqp.protocol:Recv open ok
DEBUG:aioamqp.channel:Channel is open
INFO:__main__:Close channel
INFO:aioamqp.channel:Channel closed
INFO:__main__:Close protocol
INFO:aioamqp.protocol:Recv close ok
INFO:__main__:Close transport
WARNING:aioamqp.protocol:Connection lost exc=None
ERROR:__main__:Handle error ChannelClosed(ChannelClosed(None, None), 'Channel is closed')
Environment:
For now, the library does not reuse previously used channel ID.
Thus, it cant use more than 2^16 channel
The asyncio documentation says:
Don’t call set_result() or set_exception() method of Future if the future is cancelled:
it would fail with an exception. For example, write:if not fut.cancelled(): fut.set_result('done')
The future state is not checked and breaks (I don't know how the future gets cancelled, yet) :
error on dispatch Traceback (most recent call last): File "/home/azureuser/.local/share/virtualenvs/pfc/lib/python3.4/site-packages/aioamqp/protocol.py", line 196, in run yield from self.dispatch_frame() File "/home/azureuser/.local/share/virtualenvs/pfc/lib/python3.4/site-packages/aioamqp/protocol.py", line 177, in dispatch_frame yield from self.channels[frame.channel].dispatch_frame(frame) File "/home/azureuser/.local/share/virtualenvs/pfc/lib/python3.4/site-packages/aioamqp/channel.py", line 91, in dispatch_frame yield from methods[(frame.class_id, frame.method_id)](frame) File "/usr/lib/python3.4/asyncio/tasks.py", line 84, in coro res = func(*args, **kw) File "/home/azureuser/.local/share/virtualenvs/pfc/lib/python3.4/site-packages/aioamqp/channel.py", line 120, in open_ok fut.set_result(True) File "/usr/lib/python3.4/asyncio/futures.py", line 298, in set_result raise InvalidStateError('{}: {!r}'.format(self._state, self)) asyncio.futures.InvalidStateError: CANCELLED: Future<CANCELLED>
Upgrading from 0.4.0 to 0.5.1 with no code changes I get this error. Did the API change?
aioamqp.exceptions.ConfigurationError: basic_consume requires a coroutine callback
This is a spurious error.
One might want to use tools like functools.partial
, or a nested function, to create the callback. Example:
yield from my_channel.basic_consume(my_queue, callback=functools.partial(self._on_rpc, my_data))
Checking for None (or maybe callable()
) should be sufficient.
I have one failure in tests (other tests are succeed):
nosetests --verbosity=2 aioamqp.tests.test_protocol:ProtocolTestCase.test_connection_unexistant_vhost
test_connection_unexistant_vhost (aioamqp.tests.test_protocol.ProtocolTestCase) ... FAIL
======================================================================
FAIL: test_connection_unexistant_vhost (aioamqp.tests.test_protocol.ProtocolTestCase)
----------------------------------------------------------------------
Traceback (most recent call last):
File "/home/mastak/aioamqp/aioamqp/tests/testing.py", line 41, in wrapper
self.loop.run_until_complete(asyncio.wait_for(coro(self), timeout=timeout_, loop=self.loop))
File "/usr/lib/python3.4/asyncio/base_events.py", line 316, in run_until_complete
return future.result()
File "/usr/lib/python3.4/asyncio/futures.py", line 275, in result
raise self._exception
File "/usr/lib/python3.4/asyncio/tasks.py", line 238, in _step
result = next(coro)
File "/usr/lib/python3.4/asyncio/tasks.py", line 377, in wait_for
return fut.result()
File "/usr/lib/python3.4/asyncio/futures.py", line 275, in result
raise self._exception
File "/usr/lib/python3.4/asyncio/tasks.py", line 238, in _step
result = next(coro)
File "/home/mastak/aioamqp/aioamqp/tests/test_protocol.py", line 43, in test_connection_unexistant_vhost
yield from amqp_connect(virtualhost='/unexistant', loop=self.loop)
nose.proxy.AssertionError: AmqpClosedConnection not raised
-------------------- >> begin captured stdout << ---------------------
http://localhost:15672/api/vhosts/%2Ftest-aioamqp
http://localhost:15672/api/vhosts/%2Ftest-aioamqp
http://localhost:15672/api/permissions/%2Ftest-aioamqp/guest
--------------------- >> end captured stdout << ----------------------
-------------------- >> begin captured logging << --------------------
asyncio: DEBUG: Using selector: EpollSelector
aioamqp.protocol: INFO: Recv open ok
aioamqp.channel: DEBUG: Channel is open
aioamqp.protocol: WARNING: Server closed connection: NOT_ALLOWED - access to vhost '/unexistant' refused for user 'guest', code=530, class_id=10, method_id=40
aioamqp.tests.testcase: DEBUG: Delete channel <aioamqp.tests.testcase.ProxyChannel object at 0x7f7745bc4d30>
aioamqp.tests.testcase: DEBUG: Delete channel <aioamqp.tests.testcase.ProxyChannel object at 0x7f7745bc4d30>
aioamqp.tests.testcase: DEBUG: Delete amqp <aioamqp.tests.testcase.ProxyAmqpProtocol object at 0x7f7745bc4e10>
aioamqp.protocol: ERROR: error on dispatch
Traceback (most recent call last):
File "/home/mastak/aioamqp/aioamqp/protocol.py", line 255, in run
yield from self.dispatch_frame()
File "/home/mastak/aioamqp/aioamqp/protocol.py", line 210, in dispatch_frame
yield from self.channels[frame.channel].dispatch_frame(frame)
File "/home/mastak/aioamqp/aioamqp/channel.py", line 111, in dispatch_frame
yield from methods[(frame.class_id, frame.method_id)](frame)
File "/usr/lib/python3.4/asyncio/coroutines.py", line 141, in coro
res = func(*args, **kw)
File "/home/mastak/aioamqp/aioamqp/channel.py", line 166, in close_ok
self._get_waiter('close').set_result(True)
File "/home/mastak/aioamqp/aioamqp/channel.py", line 50, in _get_waiter
raise exceptions.SynchronizationError("Call %s didn't set a waiter" % rpc_name)
aioamqp.exceptions.SynchronizationError: Call close didn't set a waiter
aioamqp.protocol: WARNING: Server closed connection: CHANNEL_ERROR - expected 'channel.open', code=504, class_id=20, method_id=40
aioamqp.protocol: WARNING: Connection lost exc=None
--------------------- >> end captured logging << ---------------------
----------------------------------------------------------------------
Ran 1 test in 0.151s
Also I have many failures tests when I run it on mac os.
It has some limits with docker, and I want to fix it.
But I think it would be better if all the tests will pass on linux, and only after that I will make changes.
Going from the RPC example ... Let's assume I want to call my caller again in the on_request
callback and wait for the response. How can I achieve this without a deadlock?
Currently when protocol connection_lost
is called by asyncio
, no channel listeners notice it. If a greenlet is waiting on a channel.consume()
it will be stuck there for good.
I am noticing this in our staging environment. We sometimes see:
[WARNING aioamqp.protocol protocol: MainProcess] Connection lost exc=ConnectionResetError(104, 'Connection reset by peer')
[INFO aioamqp.protocol protocol: MainProcess] Close connection
And after this log message, the program keeps waiting forever on a channel.consume()
but it does not receive messages anymore.
I guess it could be nice to propagate the exceptions to all open channels, a bit like the server_channel_close
in Channel
does.
If that's ok for you, I could try and submit a patch.
Hello,
Does driver work?
I only see this:
(.venv)vg %> python test_receive.py
b'py2.queue'
b'py2.queue'
b'py2.queue'
b'py2.queue'
b'py2.queue'
# rabbitmqctl list_channels
Listing channels ...
<[email protected]> python_test 1 5
I sent 3 messages to:
(.venv)vg %> python send.py
Hello World!
(.venv)vg %> python send.py
Hello World!
(.venv)vg %> python send.py
Hello World!
But I still see and expecting messages:
(.venv)vg %> python test_receive.py
b'py2.queue'
b'py2.queue'
b'py2.queue'
b'py2.queue'
b'py2.queue'
Okey, time to run receiver based on pika:
(.venv)vg %> python recive.py
Received b'Hello backend2'
Received b'Hello backend2'
Received b'Hello backend2'
# rabbitmqctl list_channels
Listing channels ...
<[email protected]> python_test 1 5
<[email protected]> python_test 1 0
test_receive.py:
#!/usr/bin/env python
import asyncio
import aioamqp
@asyncio.coroutine
def callback(body, envelope, properties):
print(body)
@asyncio.coroutine
def receive():
try:
transport, protocol = yield from aioamqp.connect(host='10.10.80.23', port=5672,
login='python_test', password='python_test', virtualhost='hello')
except aioamqp.AmqpClosedConnection:
print("closed connections")
return
channel = yield from protocol.channel()
queue_name = 'py2.queue'
yield from asyncio.wait_for(channel.queue(queue_name, durable=False, auto_delete=False), timeout=10)
yield from asyncio.wait_for(channel.basic_consume(queue_name, callback=callback), timeout=10)
asyncio.get_event_loop().run_until_complete(receive())
asyncio.get_event_loop().run_forever()
send.py:
#!/usr/bin/env python
import pika
credentials = pika.PlainCredentials('python_test', 'python_test')
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='10.10.80.25',credentials=credentials, virtual_host='hello'))
channel = connection.channel()
channel.queue_declare(queue='py2.queue')
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello backend2')
print("Hello World!")
connection.close()
recive.py:
#!/usr/bin/env python
import pika
credentials = pika.PlainCredentials('python_test', 'python_test')
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='10.10.80.23',credentials=credentials, virtual_host='hello'))
channel = connection.channel()
channel.queue_declare(queue='py2.queue')
def callback(ch, method, properties, body):
print("Received {}".format(body))
channel.basic_consume(callback,
queue='hello',
no_ack=True)
channel.start_consuming()
Channel objects have publish() and basic_publish(), which behave subtly differently (right now I am looking at waiting for acks in confirm mode).
They should probably be consolidated.
It seems aioamqp.connect returns both the transport and the protocol objects now. And the docs at http://aioamqp.readthedocs.org/en/latest/api.html#starting-a-connection instruct to close the connection using yield from transport.close()
.
Is returning the transport to the library's user neat? It seems to make things more complicated than necessary as AmqpProtocol also has a close method.
ERROR:aioamqp.protocol:error on dispatch
Traceback (most recent call last):
File "/home/vagrant/venv/lib/python3.5/site-packages/aioamqp-0.8.2-py3.5.egg/aioamqp/protocol.py", line 284, in run
yield from self.dispatch_frame()
File "/home/vagrant/venv/lib/python3.5/site-packages/aioamqp-0.8.2-py3.5.egg/aioamqp/protocol.py", line 231, in dispatch_frame
frame = yield from self.get_frame()
File "/home/vagrant/venv/lib/python3.5/site-packages/aioamqp-0.8.2-py3.5.egg/aioamqp/protocol.py", line 215, in get_frame
yield from frame.read_frame()
File "/home/vagrant/venv/lib/python3.5/site-packages/aioamqp-0.8.2-py3.5.egg/aioamqp/frame.py", line 412, in read_frame
data = yield from self.reader.readexactly(7)
AttributeError: 'NoneType' object has no attribute 'readexactly'
This Error occur randomly, but it does occure many times. And because this error pollute logs, I can't locate the cause of this error.
Sorry for that I can't make reproduce this time. Just let you know about it.
I've tried asynqp at first, but they for some reason don't allow coroutines to be message handlers, which is also insane (that's defiles the purpose of the library, don't you think?).
Your library on the other hand is sane enough to allow it, but the api...
I'll give examples.
channel.exchange_declare(exchange_name='logs', type_name='fanout'
channel.declare_exchange(name='logs', type='fanout')
. And if you want to be anal and explicit make type
Enum.result = yield from channel.queue(queue_name='', exclusive=True)
queue_name = result['queue']
yield from channel.queue_bind(exchange_name='logs', queue_name=queue_name, routing_key='')
result
is a Mapping? What? It should at least be a namedtuple. Mapping is a thing that can map arbitrary keys to arbitrary values, that's not what you're doing here. You task is more specific than that.
But this gets better. To get queue name you need to get value by 'queue'
key. Why not 'name'
at least?
queue_bind
takes exchange name, but not exchange object. That's also non intuitive and inconvinient.return (yield from self._write_frame_awaiting_response(
'exchange_delete', frame, request, no_wait, timeout=timeout))
You know when you send tcp frames somewhere and then await for response there's a name for it. Request. return await self.send_request('exchange_delete', frame, ...)`
That's just from skimming the sources.
I hope you're won't be upset by my critique. I'd like to say that it came from amusement, because It's not obvious to me, how could you handle all this nitty-gritty low level details like tcp sockets, sending frames, serialization etc without problems, but fail to provide an intuitive, easy to use and beautiful api.
Usecase: my Python application is connecting to multiple RabbitMQ servers. These are non-clustered. Our events are sharded across these servers. We want to consume events from all these servers. These events need to be ack'ed.
Problem:
channel.basic_consume(queue_name, callback=callback)
callback looks like:
def callback(body, envelope, properties):
thus inside the callback you have no idea from which server you received the event.
Ticket #39 suggests creating a separate channel to ack. Problem I don't know which server to create the channel for.
Initially I thought I just create a lambda for the callback and bind the ack-channel, but then I found:
ff74ee7
So that's probably not going to work out. Wonder why this change was made anyway.
I'm pretty new to Python, so I am not sure what I'm supposed to do in this case. Should I wrap these functions in objects and can the callback function point to a member-function of the object? The object would then have access to the server/channel, thus we're able to send an ack.
aioamqp 0.7.0
RabbitMQ 3.6.1
Got an exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.5/dist-packages/aioamqp/protocol.py", line 262, in run
yield from self.dispatch_frame()
File "/usr/local/lib/python3.5/dist-packages/aioamqp/protocol.py", line 217, in dispatch_frame
yield from channel.dispatch_frame(frame)
File "/usr/local/lib/python3.5/dist-packages/aioamqp/channel.py", line 111, in dispatch_frame
yield from methods(frame.class_id, frame.method_id)
File "/usr/lib/python3.5/asyncio/coroutines.py", line 200, in coro
res = func(_args, *_kw)
File "/usr/local/lib/python3.5/dist-packages/aioamqp/channel.py", line 665, in server_basic_cancel
consumer_tag = frame.arguments['consumer_tag']
KeyError: 'consumer_tag'
And the client has stopped consuming messages after it.
I don't see any possibility to handle such situation due to exception swallowed in AmqpProtocol.run.
Hi,
I'm currently thinking of removing python 3.3 support.
What if aioamqp 0.9 drops it ?
Currently, received frames are decoded, and forwarded to the correct channel.
In the case of a content-body frame, those frame are stacked in an asyncio's Queue, but I think it's not very efficient because of memory waste.
Is there an efficient way to not use callbacks when consuming messages ?
As per amqp
docs it suggested to open one connection and have any number of channels open. There is a huge performance improvement for 1 connection - n channel
pattern. It will be worth to maintain pool of amqp channels.
I am interested in sending a pull request. Please let me know if any more info is required.
Are there any example projects you could refer to which have tests for code using aioamqp?
Is there any documentation about that?
Is there an easy way to mock RabbitMQ (Exchanges, Queues, ...). e.g. so I'm able to manually put messages into a queue or just trigger the consumer handler?
Do you have other noteworthy comments for testing aioamqp code?
I have a big stream of data, and firstly I tried to push one data item in one publish
call.
But periodically happened this error:
Server closed connection: UNEXPECTED_FRAME - expected content body, got non content body frame instead, code=505, class_id=60, method_id=40
In server logs:
Error on AMQP connection <0.11901.881> (172.30.0.83:56306 -> 172.17.0.26:5672, vhost: 'sync', user: 'producer', state: running), channel 1:
operation basic.publish caused a connection exception unexpected_frame: "expected content body, got non content body frame instead"
It was about 1000 - 10 000 items / sec.
Then I started making groups and send it (100 - 300 items / sec). An error has gone.
Is it has limit? :)
How can I fix it?
https://github.com/Polyconseil/aioamqp/blob/bcz/connect_to_amqp/aioamqp/frame.py#L53
=> instancier un AmqpEncoder
à l'intérieur de l'AmqpEncoder
me semble louche. Pourquoi ne pas travailler directement avec l'instance en cours ? sinon, peut-être envisager une classmethod
.
https://github.com/Polyconseil/aioamqp/blob/bcz/connect_to_amqp/aioamqp/frame.py#L78
=> ValueError
?
https://github.com/Polyconseil/aioamqp/blob/bcz/connect_to_amqp/aioamqp/frame.py#L84
=> Plutôt qu'un cast :
if bit:
byte_value |= (1 << arg_index)
Par ailleurs, ça va écrire les bits de droite à gauche (à l'inverse du sens dans lequels ils sont donnés). Il faudrait sans doute aussi vérifier le nombre de bits donnés.
https://github.com/Polyconseil/aioamqp/blob/bcz/connect_to_amqp/aioamqp/frame.py#L94
=> Attention, ça va être endianess-dependant. Pour être indépendant de l'archi sous-jacente utiliser un "<" dans la chaîne de format (pareil pour les fonctions en-dessous):
>>> struct.pack("<H", 1)
b'\x01\x00'
>>> struct.pack(">H", 1)
b'\x00\x01'
https://github.com/Polyconseil/aioamqp/blob/bcz/connect_to_amqp/aioamqp/frame.py#L105
=> Ne faudrait-il pas préciser un encoding
pour un comportement prévisible ?
https://github.com/Polyconseil/aioamqp/blob/bcz/connect_to_amqp/aioamqp/frame.py#L119
=> Même remarque que sur le write_table
, je ne comprends pas pourquoi on instancie un autre encodeur.
https://github.com/Polyconseil/aioamqp/blob/bcz/connect_to_amqp/aioamqp/frame.py#L195
=> Mêmes remarques sur l'endianess. Il faut garantir un comportement indépendant de la machine.
https://github.com/Polyconseil/aioamqp/blob/bcz/connect_to_amqp/aioamqp/frame.py#L218
=> Ça ne retourne rien ?
Would you like add documentation/examples to creation of rpc server/client?
today i installed aioamqp and version is 0.6.0
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.