Comments (21)
Thank you for this bug report.
The problem is to know why the future was cancelled, but it shows that we need to check the futures more carefuly.
Can you add more details on how you get this error ? The conditions ?
Thank you.
from aioamqp.
Sure. I'm using aioamqp inside a web app to spawn background tasks in another machine. I create a new channel in each request and then I publish to an exchange, like this:
channel = yield from protocol.channel()
yield from channel.publish("test", exchange_name='workers', routing_key='task1')
The issue comes when I do localhost ApacheBench tests with -n 10000 and -c 1000:
$ ab -n 10000 -c 1000 localhost:8080/
This is ApacheBench, Version 2.3 <$Revision: 1528965 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/
Benchmarking localhost (be patient)
Completed 1000 requests
Completed 2000 requests
Completed 3000 requests
Completed 4000 requests
Completed 5000 requests
Completed 6000 requests
Completed 7000 requests
Completed 8000 requests
Completed 9000 requests
apr_socket_recv: Connection reset by peer (104)
Total of 9873 requests completed
I think the -n and -c values will be different in each machine.
I have reduced the code to
- get request,
- publish to exchange,
but still the same issue.
Regards
from aioamqp.
Is there any interesting log in your amqp broker ? what does it says ?
from aioamqp.
It's RabbitMQ. Nothing interesting there, only open/close connections.
I have noticed that it does not happen every time (but apache bench always fails "apr_socket_recv: Connection reset by peer (104)"), so it seems that when the http server breaks the future is canceled and then occurs some kind of race condition.
Perhaps it's not your fault and you only have to check future's state for graceful behavior when program breaks.
from aioamqp.
I'll try something similar tomorow afternoon. Which asyncio http server are you using ? aiohttp.web ?
from aioamqp.
Yes, it's aiohttp, with this minimal code fails too:
import asyncio
import aioamqp
import textwrap
from aiohttp.web import Application, Response, StreamResponse
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("-p", "--port", help="Port")
args = parser.parse_args()
port = args.port or 8080
def index(request):
channel = yield from protocol.channel()
yield from channel.publish("test", exchange_name='workers', routing_key='work1')
return Response(body=b'OK')
@asyncio.coroutine
def init(loop):
global transport, protocol
transport, protocol = yield from aioamqp.connect()
app = Application(loop=loop)
app.router.add_route('GET', '/', index)
handler = app.make_handler()
srv = yield from loop.create_server(handler, 'localhost', port)
print("Server started at http://localhost:" + str(port))
return srv, handler
transport, protocol = None, None
loop = asyncio.get_event_loop()
srv, handler = loop.run_until_complete(init(loop))
try:
loop.run_forever()
except KeyboardInterrupt:
loop.run_until_complete(handler.finish_connections())
Exchanges and queues are created previously and are durable.
from aioamqp.
index()
creates a new channel on every request, but doesn't close them ever. Does the problem reproduce with a yield from channel.close()
before return Response()
?
This doesn't sound like abusing the protocol as the server gladly creates the new channels, but might be related to the error emerging.
from aioamqp.
Hello,
I used your script on my machine, I set index to be a coroutine, and added a yield from channel.close() in this coroutine.
results:
$ ab -n 10000 -c 1000 localhost:8080/
This is ApacheBench, Version 2.3 <$Revision: 1604373 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/
Benchmarking localhost (be patient)
Completed 1000 requests
Completed 2000 requests
Completed 3000 requests
Completed 4000 requests
Completed 5000 requests
Completed 6000 requests
Completed 7000 requests
Completed 8000 requests
Completed 9000 requests
Completed 10000 requests
Finished 10000 requests
Server Software:
Server Hostname: localhost
Server Port: 8080
Document Path: /
Document Length: 2 bytes
Concurrency Level: 1000
Time taken for tests: 8.997 seconds
Complete requests: 10000
Failed requests: 0
Total transferred: 1310000 bytes
HTML transferred: 20000 bytes
Requests per second: 1111.43 #/sec
Time per request: 899.741 ms
Time per request: 0.900 [ms](mean, across all concurrent requests)
Transfer rate: 142.18 [Kbytes/sec] received
Connection Times (ms)
min mean[+/-sd] median max
Connect: 0 491 1083.9 0 7013
Processing: 18 213 590.8 106 6504
Waiting: 18 209 590.8 102 6503
Total: 18 703 1408.4 115 8982
Percentage of the requests served within a certain time (ms)
50% 115
66% 165
75% 1105
80% 1118
90% 1315
95% 3069
98% 7494
99% 8925
100% 8982 (longest request)
from aioamqp.
Arg, I forgot channel.close() but still fails, try it with higher numbers
from aioamqp.
Hello again,
I ran 'ab' with :
$ ab -n 1000000 -c 10000 localhost:8080/
apr_socket_recv: Connection reset by peer (104)
Total of 69595 requests completed
And got an error when decoding the frame:
Error handling request
Traceback (most recent call last):
File "/home/benoit/test/venv34/lib/python3.4/site-packages/aiohttp/server.py", line 272, in start
yield from self.handle_request(message, payload)
File "/home/benoit/test/venv34/lib/python3.4/site-packages/aiohttp/web.py", line 85, in handle_request
resp = yield from handler(request)
File "ai.py", line 16, in index
channel = yield from protocol.channel()
File "/home/benoit/Projects/blue/aioamqp/aioamqp/protocol.py", line 306, in channel
yield from channel.open()
File "/home/benoit/Projects/blue/aioamqp/aioamqp/channel.py", line 127, in open
yield from self._write_frame(frame, request, no_wait=False, timeout=timeout, no_check_open=True)
File "/usr/lib/python3.4/asyncio/coroutines.py", line 141, in coro
res = func(*args, **kw)
File "/home/benoit/Projects/blue/aioamqp/aioamqp/channel.py", line 111, in _write_frame
frame.write_frame(request)
File "/home/benoit/Projects/blue/aioamqp/aioamqp/frame.py", line 385, in write_frame
header = struct.pack('!BHI', self.frame_type, self.channel, payload.tell() + len(content_header))
struct.error: 'H' format requires 0 <= number <= 65535
I must check the doc to check the frame parsing. Can you test against the last master ?
from aioamqp.
aioamqp cannot reuses previous channel id for now.
I created issue #36
from aioamqp.
Last master keeps failing. Remember that it does not shows the error every time, it does after two or three tests.
from aioamqp.
I'll retest this when the library would reuise the channel id
from aioamqp.
I also encountered this. (Or at least I think I did.) I take it that fixing the problem is more complicated than just adding a check if fut.cancelled()
in channel.py?
https://docs.python.org/3/library/asyncio-dev.html?highlight=cancelled#cancellation
from aioamqp.
Here's a log where the problem occurs. It looks like self._get_waiter('close')
in channel.py is canceled, so self._get_waiter('close').set_result(True)
raises an exception.
Dec 28 08:51:02 etna docker[25369]: ERROR:aioamqp.protocol:error on dispatch
Dec 28 08:51:02 etna docker[25369]: Traceback (most recent call last):
Dec 28 08:51:02 etna docker[25369]: File "/env/lib/python3.5/site-packages/aioamqp/protocol.py", line 256, in run
Dec 28 08:51:02 etna docker[25369]: yield from self.dispatch_frame()
Dec 28 08:51:02 etna docker[25369]: File "/env/lib/python3.5/site-packages/aioamqp/protocol.py", line 211, in dispatch_frame
Dec 28 08:51:02 etna docker[25369]: yield from self.channels[frame.channel].dispatch_frame(frame)
Dec 28 08:51:02 etna docker[25369]: File "/env/lib/python3.5/site-packages/aioamqp/channel.py", line 110, in dispatch_frame
Dec 28 08:51:02 etna docker[25369]: yield from methods[(frame.class_id, frame.method_id)](frame)
Dec 28 08:51:02 etna docker[25369]: File "/usr/lib/python3.5/asyncio/coroutines.py", line 206, in coro
Dec 28 08:51:02 etna docker[25369]: res = func(*args, **kw)
Dec 28 08:51:02 etna docker[25369]: File "/env/lib/python3.5/site-packages/aioamqp/channel.py", line 165, in close_ok
Dec 28 08:51:02 etna docker[25369]: self._get_waiter('close').set_result(True)
Dec 28 08:51:02 etna docker[25369]: File "/usr/lib/python3.5/asyncio/futures.py", line 329, in set_result
Dec 28 08:51:02 etna docker[25369]: raise InvalidStateError('{}: {!r}'.format(self._state, self))
Dec 28 08:51:02 etna docker[25369]: asyncio.futures.InvalidStateError: CANCELLED: <Future cancelled>
I think this might be occurring in a case where there's two attempts to close the channel.
from aioamqp.
Hello @ariddell,
would you please paste some ? it seems you already closed the channel ?
from aioamqp.
I'm not doing anything sophisticated, just a simple RPC setup; no multi-threading just asyncio. If I do have two coroutines that both close the connection/channel there shouldn't be an error, right?
I'll see if I can't figure out a way to reproduce the error.
from aioamqp.
I have a few days to have a look right now. I can push a branch with a fix, but I would like to know how you're using aioamqp and how you're triggering this bug.
Thank you
from aioamqp.
I'm pretty sure I'm calling close on the connection and then close on a channel (associated with the connection). I know this is wrong but I think aioamqp might want to check on the future being cancelled.
In case you're looking for prior art, here is how aiohttp closes a websocket -- They have a _closed
variable that tracks state. And they return False if the connection is already closed.
@asyncio.coroutine
def close(self, *, code=1000, message=b''):
if self._writer is None:
raise RuntimeError('Call .prepare() first')
if not self._closed:
self._closed = True
try:
self._writer.close(code, message)
except (asyncio.CancelledError, asyncio.TimeoutError):
self._close_code = 1006
raise
except Exception as exc:
self._close_code = 1006
self._exception = exc
return True
if self._closing:
return True
while True:
try:
msg = yield from asyncio.wait_for(
self._reader.read(),
timeout=self._timeout, loop=self._loop)
except asyncio.CancelledError:
self._close_code = 1006
raise
except Exception as exc:
self._close_code = 1006
self._exception = exc
return True
if msg.tp == MsgType.close:
self._close_code = msg.data
return True
else:
return False
from aioamqp.
Hello @ariddell.
In aioamqp, the code is a little bit different: you get an exception in the code when receiving the confirmation from the server (https://www.rabbitmq.com/amqp-0-9-1-reference.html#connection.close-ok) but the whole channel is already mark'd as closed.
Could you please tell me how you're triggering this behaviour ? I'll dive into it and probably rework the way we're closing the channel.
Thank you.
from aioamqp.
I don't know how the exception is happening. I think it's something in a finally clause so it's not affecting my application. I'll keep you posted. Thanks for your work on this!
from aioamqp.
Related Issues (20)
- Cleanup login method HOT 1
- channel.basic_consume API docs HOT 2
- AttributeError: 'Channel' object has no attribute 'basic_ack' HOT 1
- Message Properties: 'type' property HOT 7
- If value passed to 'x-message-ttl' is greater than or equal 128, then invalid Negative value is sending to server HOT 3
- Channel publish requires a lock
- Python3.8
- Init login method default value 'AMQPLAIN' HOT 2
- Testing with pamqp 3.0 HOT 6
- Is there any usage example or tutorial documents HOT 1
- Publishing to a non-existing exchange does not raise errors HOT 1
- Republish the pypi docs to pick up Python3.9 HOT 3
- Support server-generated queue names
- AmazonMQ issues on `queue_declare` HOT 1
- Ready for release on pypi? HOT 1
- Channel.basic_server_ack doesn't respect `multiple` flag HOT 3
- Unable to subscribe to a queue with wildcard (asterisk) HOT 3
- using tag uri scheme for queue and exchange names are not longer possible HOT 1
- No longer handling CancelledError on protocol.run
- from_url does not parse heartbeat query param
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
π Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. πππ
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google β€οΈ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from aioamqp.