Giter Club home page Giter Club logo

Comments (21)

dzen avatar dzen commented on August 15, 2024

Thank you for this bug report.

The problem is to know why the future was cancelled, but it shows that we need to check the futures more carefuly.

Can you add more details on how you get this error ? The conditions ?
Thank you.

from aioamqp.

jcarmena avatar jcarmena commented on August 15, 2024

Sure. I'm using aioamqp inside a web app to spawn background tasks in another machine. I create a new channel in each request and then I publish to an exchange, like this:

channel = yield from protocol.channel()
yield from channel.publish("test", exchange_name='workers', routing_key='task1')

The issue comes when I do localhost ApacheBench tests with -n 10000 and -c 1000:

$ ab -n 10000 -c 1000 localhost:8080/
This is ApacheBench, Version 2.3 <$Revision: 1528965 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/

Benchmarking localhost (be patient)
Completed 1000 requests
Completed 2000 requests
Completed 3000 requests
Completed 4000 requests
Completed 5000 requests
Completed 6000 requests
Completed 7000 requests
Completed 8000 requests
Completed 9000 requests
apr_socket_recv: Connection reset by peer (104)
Total of 9873 requests completed

I think the -n and -c values will be different in each machine.
I have reduced the code to

  1. get request,
  2. publish to exchange,
    but still the same issue.

Regards

from aioamqp.

dzen avatar dzen commented on August 15, 2024

Is there any interesting log in your amqp broker ? what does it says ?

from aioamqp.

jcarmena avatar jcarmena commented on August 15, 2024

It's RabbitMQ. Nothing interesting there, only open/close connections.

I have noticed that it does not happen every time (but apache bench always fails "apr_socket_recv: Connection reset by peer (104)"), so it seems that when the http server breaks the future is canceled and then occurs some kind of race condition.

Perhaps it's not your fault and you only have to check future's state for graceful behavior when program breaks.

from aioamqp.

dzen avatar dzen commented on August 15, 2024

I'll try something similar tomorow afternoon. Which asyncio http server are you using ? aiohttp.web ?

from aioamqp.

jcarmena avatar jcarmena commented on August 15, 2024

Yes, it's aiohttp, with this minimal code fails too:

import asyncio
import aioamqp
import textwrap
from aiohttp.web import Application, Response, StreamResponse


import argparse
parser = argparse.ArgumentParser()
parser.add_argument("-p", "--port", help="Port")
args = parser.parse_args()
port = args.port or 8080


def index(request):
    channel = yield from protocol.channel()
    yield from channel.publish("test", exchange_name='workers', routing_key='work1')
    return Response(body=b'OK')


@asyncio.coroutine
def init(loop):
    global transport, protocol
    transport, protocol = yield from aioamqp.connect()

    app = Application(loop=loop)
    app.router.add_route('GET', '/', index)

    handler = app.make_handler()
    srv = yield from loop.create_server(handler, 'localhost', port)
    print("Server started at http://localhost:" + str(port))
    return srv, handler


transport, protocol = None, None
loop = asyncio.get_event_loop()
srv, handler = loop.run_until_complete(init(loop))
try:
    loop.run_forever()
except KeyboardInterrupt:
    loop.run_until_complete(handler.finish_connections())

Exchanges and queues are created previously and are durable.

from aioamqp.

mwfrojdman avatar mwfrojdman commented on August 15, 2024

index() creates a new channel on every request, but doesn't close them ever. Does the problem reproduce with a yield from channel.close() before return Response()?

This doesn't sound like abusing the protocol as the server gladly creates the new channels, but might be related to the error emerging.

from aioamqp.

dzen avatar dzen commented on August 15, 2024

Hello,

I used your script on my machine, I set index to be a coroutine, and added a yield from channel.close() in this coroutine.

results:

$ ab -n 10000 -c 1000 localhost:8080/
This is ApacheBench, Version 2.3 <$Revision: 1604373 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/

Benchmarking localhost (be patient)
Completed 1000 requests
Completed 2000 requests
Completed 3000 requests
Completed 4000 requests
Completed 5000 requests
Completed 6000 requests
Completed 7000 requests
Completed 8000 requests
Completed 9000 requests
Completed 10000 requests
Finished 10000 requests

Server Software:
Server Hostname: localhost
Server Port: 8080

Document Path: /
Document Length: 2 bytes

Concurrency Level: 1000
Time taken for tests: 8.997 seconds
Complete requests: 10000
Failed requests: 0
Total transferred: 1310000 bytes
HTML transferred: 20000 bytes
Requests per second: 1111.43 #/sec
Time per request: 899.741 ms
Time per request: 0.900 [ms](mean, across all concurrent requests)
Transfer rate: 142.18 [Kbytes/sec] received

Connection Times (ms)
min mean[+/-sd] median max
Connect: 0 491 1083.9 0 7013
Processing: 18 213 590.8 106 6504
Waiting: 18 209 590.8 102 6503
Total: 18 703 1408.4 115 8982

Percentage of the requests served within a certain time (ms)
50% 115
66% 165
75% 1105
80% 1118
90% 1315
95% 3069
98% 7494
99% 8925
100% 8982 (longest request)

from aioamqp.

jcarmena avatar jcarmena commented on August 15, 2024

Arg, I forgot channel.close() but still fails, try it with higher numbers

from aioamqp.

dzen avatar dzen commented on August 15, 2024

Hello again,

I ran 'ab' with :

 $ ab -n 1000000 -c 10000 localhost:8080/

apr_socket_recv: Connection reset by peer (104)
Total of 69595 requests completed

And got an error when decoding the frame:

Error handling request
Traceback (most recent call last):
  File "/home/benoit/test/venv34/lib/python3.4/site-packages/aiohttp/server.py", line 272, in start
    yield from self.handle_request(message, payload)
  File "/home/benoit/test/venv34/lib/python3.4/site-packages/aiohttp/web.py", line 85, in handle_request
    resp = yield from handler(request)
  File "ai.py", line 16, in index
    channel = yield from protocol.channel()
  File "/home/benoit/Projects/blue/aioamqp/aioamqp/protocol.py", line 306, in channel
    yield from channel.open()
  File "/home/benoit/Projects/blue/aioamqp/aioamqp/channel.py", line 127, in open
    yield from self._write_frame(frame, request, no_wait=False, timeout=timeout, no_check_open=True)
  File "/usr/lib/python3.4/asyncio/coroutines.py", line 141, in coro
    res = func(*args, **kw)
  File "/home/benoit/Projects/blue/aioamqp/aioamqp/channel.py", line 111, in _write_frame
    frame.write_frame(request)
  File "/home/benoit/Projects/blue/aioamqp/aioamqp/frame.py", line 385, in write_frame
    header = struct.pack('!BHI', self.frame_type, self.channel, payload.tell() + len(content_header))
struct.error: 'H' format requires 0 <= number <= 65535

I must check the doc to check the frame parsing. Can you test against the last master ?

from aioamqp.

dzen avatar dzen commented on August 15, 2024

aioamqp cannot reuses previous channel id for now.

I created issue #36

from aioamqp.

jcarmena avatar jcarmena commented on August 15, 2024

Last master keeps failing. Remember that it does not shows the error every time, it does after two or three tests.

from aioamqp.

dzen avatar dzen commented on August 15, 2024

I'll retest this when the library would reuise the channel id

from aioamqp.

ariddell avatar ariddell commented on August 15, 2024

I also encountered this. (Or at least I think I did.) I take it that fixing the problem is more complicated than just adding a check if fut.cancelled() in channel.py?
https://docs.python.org/3/library/asyncio-dev.html?highlight=cancelled#cancellation

from aioamqp.

ariddell avatar ariddell commented on August 15, 2024

Here's a log where the problem occurs. It looks like self._get_waiter('close') in channel.py is canceled, so self._get_waiter('close').set_result(True) raises an exception.

Dec 28 08:51:02 etna docker[25369]: ERROR:aioamqp.protocol:error on dispatch
Dec 28 08:51:02 etna docker[25369]: Traceback (most recent call last):
Dec 28 08:51:02 etna docker[25369]: File "/env/lib/python3.5/site-packages/aioamqp/protocol.py", line 256, in run
Dec 28 08:51:02 etna docker[25369]: yield from self.dispatch_frame()
Dec 28 08:51:02 etna docker[25369]: File "/env/lib/python3.5/site-packages/aioamqp/protocol.py", line 211, in dispatch_frame
Dec 28 08:51:02 etna docker[25369]: yield from self.channels[frame.channel].dispatch_frame(frame)
Dec 28 08:51:02 etna docker[25369]: File "/env/lib/python3.5/site-packages/aioamqp/channel.py", line 110, in dispatch_frame
Dec 28 08:51:02 etna docker[25369]: yield from methods[(frame.class_id, frame.method_id)](frame)
Dec 28 08:51:02 etna docker[25369]: File "/usr/lib/python3.5/asyncio/coroutines.py", line 206, in coro
Dec 28 08:51:02 etna docker[25369]: res = func(*args, **kw)
Dec 28 08:51:02 etna docker[25369]: File "/env/lib/python3.5/site-packages/aioamqp/channel.py", line 165, in close_ok
Dec 28 08:51:02 etna docker[25369]: self._get_waiter('close').set_result(True)
Dec 28 08:51:02 etna docker[25369]: File "/usr/lib/python3.5/asyncio/futures.py", line 329, in set_result
Dec 28 08:51:02 etna docker[25369]: raise InvalidStateError('{}: {!r}'.format(self._state, self))
Dec 28 08:51:02 etna docker[25369]: asyncio.futures.InvalidStateError: CANCELLED: <Future cancelled>

I think this might be occurring in a case where there's two attempts to close the channel.

from aioamqp.

dzen avatar dzen commented on August 15, 2024

Hello @ariddell,

would you please paste some ? it seems you already closed the channel ?

from aioamqp.

ariddell avatar ariddell commented on August 15, 2024

I'm not doing anything sophisticated, just a simple RPC setup; no multi-threading just asyncio. If I do have two coroutines that both close the connection/channel there shouldn't be an error, right?

I'll see if I can't figure out a way to reproduce the error.

from aioamqp.

dzen avatar dzen commented on August 15, 2024

I have a few days to have a look right now. I can push a branch with a fix, but I would like to know how you're using aioamqp and how you're triggering this bug.

Thank you

from aioamqp.

ariddell avatar ariddell commented on August 15, 2024

I'm pretty sure I'm calling close on the connection and then close on a channel (associated with the connection). I know this is wrong but I think aioamqp might want to check on the future being cancelled.

In case you're looking for prior art, here is how aiohttp closes a websocket -- They have a _closed variable that tracks state. And they return False if the connection is already closed.

https://github.com/KeepSafe/aiohttp/blob/e09b86204c9099389c530b2886770e0060a05f63/aiohttp/web_ws.py#L174

    @asyncio.coroutine
    def close(self, *, code=1000, message=b''):
        if self._writer is None:
            raise RuntimeError('Call .prepare() first')

        if not self._closed:
            self._closed = True
            try:
                self._writer.close(code, message)
            except (asyncio.CancelledError, asyncio.TimeoutError):
                self._close_code = 1006
                raise
            except Exception as exc:
                self._close_code = 1006
                self._exception = exc
                return True

            if self._closing:
                return True

            while True:
                try:
                    msg = yield from asyncio.wait_for(
                        self._reader.read(),
                        timeout=self._timeout, loop=self._loop)
                except asyncio.CancelledError:
                    self._close_code = 1006
                    raise
                except Exception as exc:
                    self._close_code = 1006
                    self._exception = exc
                    return True

                if msg.tp == MsgType.close:
                    self._close_code = msg.data
                    return True
        else:
            return False

from aioamqp.

dzen avatar dzen commented on August 15, 2024

Hello @ariddell.

In aioamqp, the code is a little bit different: you get an exception in the code when receiving the confirmation from the server (https://www.rabbitmq.com/amqp-0-9-1-reference.html#connection.close-ok) but the whole channel is already mark'd as closed.

Could you please tell me how you're triggering this behaviour ? I'll dive into it and probably rework the way we're closing the channel.

Thank you.

from aioamqp.

ariddell avatar ariddell commented on August 15, 2024

I don't know how the exception is happening. I think it's something in a finally clause so it's not affecting my application. I'll keep you posted. Thanks for your work on this!

from aioamqp.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    πŸ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. πŸ“ŠπŸ“ˆπŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❀️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.