pytest-dev / pytest-asyncio Goto Github PK
View Code? Open in Web Editor NEWAsyncio support for pytest
Home Page: https://pytest-asyncio.readthedocs.io
License: Apache License 2.0
Asyncio support for pytest
Home Page: https://pytest-asyncio.readthedocs.io
License: Apache License 2.0
I have functional tests for my system which requires several components running on different ports
As I see unused_tcp_port
has functional scope, so it's impossible to acquire two different ports (for different purposes, sure) in the same test.
Or maybe I missed something?
Sample test code:
tests.py
from unittest import mock
import pytest
import testmodule
@pytest.mark.asyncio
async def test_async_mock_patch_contextmanager():
with mock.patch('testmodule.foo') as foo_mock:
assert testmodule.foo is foo_mock
@pytest.mark.asyncio
@mock.patch('testmodule.foo')
async def test_async_mock_patch_decorator(foo_mock):
assert testmodule.foo is foo_mock # FAILS
def test_mock_patch_contextmanager():
with mock.patch('testmodule.foo') as foo_mock:
assert testmodule.foo is foo_mock
@mock.patch('testmodule.foo')
def test_mock_patch_decorator(foo_mock):
assert testmodule.foo is foo_mock
testmodule.py
def foo():
return 'Bar'
test_async_mock_patch_decorator
will fail, because testmodule.foo
is <function foo at ###>
, not <MagicMock name='foo' id='###'>
Tested this on python3.6 only.
UPDATE:
Seems that on python 3.5.2
there is no such bug. All tests succeeded.
I have scenario that tests part of app with a complex schedule of concurrent tasks where I'm using asyncio.ensure_future
. Here is simplified example of it:
import asyncio
import pytest
async def a_second():
await asyncio.sleep(0.1)
@pytest.mark.asyncio
async def test_very_lazy_task(event_loop):
async def one():
# we are waiting for this exception which isn't never caught
pytest.fail('Got it!')
# param loop=event_loop doesn't influent on a result at all
asyncio.ensure_future(one(), loop=event_loop)
await a_second()
# with this guy we can check that we actually have Got it error in our logs
# pytest.fail('after all!')
Sadly it seems that function one
falls out of pytest asyncio sphere of influence. And pytest.fail('Got it!') raises exception but was never caught by pytest.
with pytest.fail('after all!')
I got:
============================================= FAILURES ==============================================
________________________________________ test_very_lazy_task ________________________________________
event_loop = <_UnixSelectorEventLoop running=False closed=True debug=False>
@pytest.mark.asyncio
async def test_very_lazy_task(event_loop):
async def one():
pytest.fail('Got it!')
asyncio.ensure_future(one(), loop=event_loop)
await a_second()
> pytest.fail('after all!')
E Failed: after all!
fb/conversation/test_story.py:125: Failed
--------------------------------------- Captured stderr setup ---------------------------------------
DEBUG:asyncio:Using selector: EpollSelector
---------------------------------------- Captured log setup -----------------------------------------
selector_events.py 53 DEBUG Using selector: EpollSelector
--------------------------------------- Captured stderr call ----------------------------------------
ERROR:asyncio:Task exception was never retrieved
future: <Task finished coro=<test_very_lazy_task.<locals>.one() done, defined at /usr/src/app/fb/conversation/test_story.py:120> exception=Got it!>
Traceback (most recent call last):
File "/usr/local/lib/python3.5/asyncio/tasks.py", line 239, in _step
result = coro.send(None)
File "/usr/src/app/fb/conversation/test_story.py", line 121, in one
pytest.fail('Got it!')
File "/usr/local/lib/python3.5/site-packages/_pytest/runner.py", line 486, in fail
raise Failed(msg=msg, pytrace=pytrace)
Failed: Got it!
----------------------------------------- Captured log call -----------------------------------------
base_events.py 1090 ERROR Task exception was never retrieved
future: <Task finished coro=<test_very_lazy_task.<locals>.one() done, defined at /usr/src/app/fb/conversation/test_story.py:120> exception=Got it!>
Traceback (most recent call last):
File "/usr/local/lib/python3.5/asyncio/tasks.py", line 239, in _step
result = coro.send(None)
File "/usr/src/app/fb/conversation/test_story.py", line 121, in one
pytest.fail('Got it!')
File "/usr/local/lib/python3.5/site-packages/_pytest/runner.py", line 486, in fail
raise Failed(msg=msg, pytrace=pytrace)
Failed: Got it!
So what it the right way to catch exception inside of coroutine functions that were launched by asyncio.ensure_future?
while discussiong pytest-dev/pytest#1793 i noticed that there is only one person allowed to do pypi upload,
it would be nice to add a few more of the core pytest-dev team to the pypi maintainers
I am trying to write a test with async code AND using the unittest.TestCase base class. When I use TestCase the test method always passes.
What I am doing wrong?
import pytest
import unittest
import aiohttp
import asyncio
URL = "http://www.github.com"
class TestOnUnittestClass(unittest.TestCase):
@pytest.mark.asyncio
def test_get(self):
req = (yield from aiohttp.request("GET", URL))
assert req.status == 201
class TestOnClass:
@pytest.mark.asyncio
def test_get(self):
req = (yield from aiohttp.request("GET", URL))
assert req.status == 201
@pytest.mark.asyncio
def test_async():
req = (yield from aiohttp.request("GET", URL))
assert req.status == 201
The output is:
$ py.test -vvvv async_on_class_test.py
===================================================================================== test session starts =====================================================================================
platform linux -- Python 3.4.0 -- py-1.4.30 -- pytest-2.7.2 -- /home/paurullan/.virtualenvs/status/bin/python3
rootdir: /home/paurullan/projects/status-qa, inifile:
plugins: asyncio, xdist, colordots
collected 3 items
async_on_class_test.py::TestOnUnittestClass::test_get PASSED
async_on_class_test.py::TestOnClass::test_get FAILED
async_on_class_test.py::test_async FAILED
========================================================================================== FAILURES ===========================================================================================
____________________________________________________________________________________ TestOnClass.test_get _____________________________________________________________________________________
self = <async_on_class_test.TestOnClass object at 0x7f70e6e3e898>
@asyncio.coroutine
@pytest.mark.asyncio
def test_get(self):
req = (yield from aiohttp.request("GET", URL))
> assert req.status == 201
E assert 200 == 201
E + where 200 = <ClientResponse(https://github.com/) [200 OK]>\n<CIMultiDictProxy {'SERVER': 'GitHub.com', 'DATE': 'Tue, 11 Aug 2015 15...S': 'deny', 'VARY': 'Accept-Encoding', 'X-SERVED-BY': '53e13e5d66f560f3e2b04e74a099de0d', 'CONTENT-ENCODING': 'gzip'}>\n.status
async_on_class_test.py:23: AssertionError
_________________________________________________________________________________________ test_async __________________________________________________________________________________________
@pytest.mark.asyncio
def test_async():
req = (yield from aiohttp.request("GET", URL))
> assert req.status == 201
E assert 200 == 201
E + where 200 = <ClientResponse(https://github.com/) [200 OK]>\n<CIMultiDictProxy {'SERVER': 'GitHub.com', 'DATE': 'Tue, 11 Aug 2015 15...S': 'deny', 'VARY': 'Accept-Encoding', 'X-SERVED-BY': 'a128136e4734a9f74c013356c773ece7', 'CONTENT-ENCODING': 'gzip'}>\n.status
async_on_class_test.py:28: AssertionError
============================================================================= 2 failed, 1 passed in 2.43 seconds ==============================================================================
From pluggy docs,
If firstresult is True the 1:N hook call (N being the number of registered
hook implementation functions) will stop at I<=N when the I'th function
returns a non-None result.
https://github.com/pytest-dev/pytest/blob/master/_pytest/vendored_packages/pluggy.py#L89-L97
I'm sure I'm doing something wrong, but I can't get tests working with pytest.mark.asyncio.
import pytest
@pytest.mark.asyncio
async def test_foo():
assert False
Running py.test
claims that test_foo was executed and passed.
Using pytest 2.9.2, pytest-asyncio 0.4.0.
I am trying to create a test case for the below async function get_data
. I get Runtime warning as RuntimeWarning: coroutine 'TestAsyncStatSvc.test_async_get_data' was never awaited testfunction(**testargs)
Below is my code. Please guide on this one. AFAIK, we get this exception when there is no event loop so I created one in the fixture. What is there that I am missing?
async def get_data(self, data_id=None):
sql = """
SELECT id, description
FROM data_table
"""
sql_params = []
if data_id:
sql += """
WHERE id = $1
"""
sql_params += [data_id]
result = await self.dbproxy_async.execute_query_async(sql, sql_params)
if result.empty and data_id:
raise NotFound('data %s not found.' % data_id)
return result.to_json()
Below Is the test case:
class TestAsyncStatSvc(object):
TEST_DATA = {
'Key1': ['Key1', 'Data desc for key1'],
'Key2': ['Key2', 'Data desc for key2'],
None: [
['Key1', 'Data desc for key1'],
['Key2', 'Data desc for key2']
]
}
@pytest.mark.asyncio
async def test_async_get_data(self, data_svc_fixture):
for query_value in TESTDATA.keys:
execute_stub = MagicMock(return_value=self.TESTDATA[query_value])
# Wrap the stub in a coroutine (so it can be awaited)
execute_coro = asyncio.coroutine(execute_stub)
# Stub the database db_proxy
db_proxy = MagicMock()
db_proxy.execute_query_async = execute_coro
result = data_svc_fixture.get_data(data_id=query_value)
assert result == self.TESTDATA[query_value]
if query_value is not None:
assert len(comm) == 1
else:
assert len(comm) == 2
with pytest.raises(NotFound):
data_svc_fixture.get_data('Unobtained')
And here are the fixtures:
class Context:
def __init__(self, loop, svc_fixture):
self.loop = loop
self.svc_fixture = svc_fixture
@pytest.yield_fixture(scope="function")
def data_svc_fixture(db_proxy_fixture, get_service_fixture, my_svc_configurator_fixture, event_loop):
ctx = Context(event_loop, get_service_fixture('MySvc'))
yield ctx
event_loop.run_until_complete(asyncio.gather(*asyncio.Task.all_tasks(event_loop), return_exceptions=True))
@pytest.yield_fixture()
def event_loop():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
yield loop
loop.close()
So you can do in a test:
def test_foo(tcp_client):
client = await tcp_client([port=ip_factory()], [ip="localhost"])
await my_server.start(client.port, client.ip)
await client.send(b'some_byte')
assert my_server.has_done_stuff()
def test_bar(tcp_server):
server = await tcp_server([port=ip_factory()], [ip="localhost"])
# repond b'somebytes' when the incomming data match the "on" test
server.respond(b'somebytes', [on=lambda request: True])
client = await my_client(server.ip, server.port)
assert client.send(b'some_byte') # get the expected answer
assert my_client.has_done_stuff()
Very useful for end to end testing.
Hi @Tinche,
I have set you up as owner of this repository, and a collaborator of @pytest-dev, so I think all is good. 😄
To complete the transition, here are a couple of things it would be nice to do:
setup.py
, README
, etc) and documentation;Feel free to ask for help with any of these, here or in the mailing list.
Welcome! 😄
Forgot to tag the 0.6.0 release?
I run many tests together, and error 'TooManyConnectionsError' occurs:
.........E........EEEE..
==================================== ERRORS ====================================
______________________ ERROR at setup of test_demo_0409_2 ______________________
args = (), kwargs = {}
loop = <_UnixSelectorEventLoop running=False closed=False debug=False>
setup = <function pytest_fixture_setup.<locals>.wrapper.<locals>.setup at 0x7f357c13f488>
def wrapper(*args, **kwargs):
loop = kwargs['event_loop']
if strip_event_loop:
del kwargs['event_loop']
async def setup():
res = await f(*args, **kwargs)
return res
> return loop.run_until_complete(setup())
/usr/local/lib/python3.6/site-packages/pytest_asyncio/plugin.py:123:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/usr/local/lib/python3.6/asyncio/base_events.py:467: in run_until_complete
return future.result()
/usr/local/lib/python3.6/site-packages/pytest_asyncio/plugin.py:120: in setup
res = await f(*args, **kwargs)
../apps/conf/test/db.py:38: in pool_setup
return await db.create_pool(conn_str)
/usr/local/lib/python3.6/site-packages/gino/pool.py:147: in _async__init__
rv = await super()._async__init__()
/usr/local/lib/python3.6/site-packages/asyncpg/pool.py:349: in _async__init__
await asyncio.gather(*connect_tasks, loop=self._loop)
/usr/local/lib/python3.6/site-packages/asyncpg/pool.py:138: in connect
connection_class=self._pool._connection_class)
/usr/local/lib/python3.6/site-packages/asyncpg/connect_utils.py:274: in _connect_addr
await asyncio.wait_for(connected, loop=loop, timeout=timeout)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
fut = <Future finished exception=TooManyConnectionsError('sorry, too many clients already',)>
timeout = 59.981232699006796
@coroutine
def wait_for(fut, timeout, *, loop=None):
"""Wait for the single Future or coroutine to complete, with timeout.
Coroutine will be wrapped in Task.
Returns result of the Future or coroutine. When a timeout occurs,
it cancels the task and raises TimeoutError. To avoid the task
cancellation, wrap it in shield().
If the wait is cancelled, the task is also cancelled.
This function is a coroutine.
"""
if loop is None:
loop = events.get_event_loop()
if timeout is None:
return (yield from fut)
waiter = loop.create_future()
timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
cb = functools.partial(_release_waiter, waiter)
fut = ensure_future(fut, loop=loop)
fut.add_done_callback(cb)
try:
# wait until the future completes or the timeout
try:
yield from waiter
except futures.CancelledError:
fut.remove_done_callback(cb)
fut.cancel()
raise
if fut.done():
> return fut.result()
E asyncpg.exceptions.TooManyConnectionsError: sorry, too many clients already
/usr/local/lib/python3.6/asyncio/tasks.py:358: TooManyConnectionsError
I opened #43 for this but it looks like it's a bit more involved due to some tests no longer passing.
Some of my test cases get hung up during execution and similar cases are fine, and the problem can be reproducted. The case have problem can run smoothly if i steply execute it with pycharm. I have no idea what caused that problem, so i ctrl+c
it and run with --fulltrace
then. Is it possible that there are some kind of problems of my code? Maybe i use asyncio in wrong way? The detail is pasted below, hope that will be helpful.
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! KeyboardInterrupt !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
config = <_pytest.config.Config object at 0x7ffaeb5bbef0>, doit = <function _main at 0x7ffaeb5197b8>
def wrap_session(config, doit):
"""Skeleton command line program"""
session = Session(config)
session.exitstatus = EXIT_OK
initstate = 0
try:
try:
config._do_configure()
initstate = 1
config.hook.pytest_sessionstart(session=session)
initstate = 2
> session.exitstatus = doit(config, session) or 0
env/local/lib/python3.5/site-packages/_pytest/main.py:98:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
config = <_pytest.config.Config object at 0x7ffaeb5bbef0>, session = <Session 'aredis'>
def _main(config, session):
""" default command line protocol for initialization, session,
running tests and reporting. """
config.hook.pytest_collection(session=session)
> config.hook.pytest_runtestloop(session=session)
env/local/lib/python3.5/site-packages/_pytest/main.py:133:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <_HookCaller 'pytest_runtestloop'>
kwargs = {'__multicall__': <_MultiCall 0 results, 0 meths, kwargs={'session': <Session 'aredis'>, '__multicall__': <_MultiCall 0 results, 0 meths, kwargs={...}>}>, 'session': <Session 'aredis'>}
def __call__(self, **kwargs):
assert not self.is_historic()
> return self._hookexec(self, self._nonwrappers + self._wrappers, kwargs)
env/local/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py:745:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <_pytest.config.PytestPluginManager object at 0x7ffaece772b0>, hook = <_HookCaller 'pytest_runtestloop'>, methods = []
kwargs = {'__multicall__': <_MultiCall 0 results, 0 meths, kwargs={'session': <Session 'aredis'>, '__multicall__': <_MultiCall 0 results, 0 meths, kwargs={...}>}>, 'session': <Session 'aredis'>}
def _hookexec(self, hook, methods, kwargs):
# called from all hookcaller instances.
# enable_tracing will set its own wrapping function at self._inner_hookexec
> return self._inner_hookexec(hook, methods, kwargs)
env/local/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py:339:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
hook = <_HookCaller 'pytest_runtestloop'>, methods = []
kwargs = {'__multicall__': <_MultiCall 0 results, 0 meths, kwargs={'session': <Session 'aredis'>, '__multicall__': <_MultiCall 0 results, 0 meths, kwargs={...}>}>, 'session': <Session 'aredis'>}
self._inner_hookexec = lambda hook, methods, kwargs: \
> _MultiCall(methods, kwargs, hook.spec_opts).execute()
env/local/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py:334:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <_MultiCall 0 results, 0 meths, kwargs={'session': <Session 'aredis'>, '__multicall__': <_MultiCall 0 results, 0 meths, kwargs={...}>}>
def execute(self):
all_kwargs = self.kwargs
self.results = results = []
firstresult = self.specopts.get("firstresult")
while self.hook_impls:
hook_impl = self.hook_impls.pop()
try:
args = [all_kwargs[argname] for argname in hook_impl.argnames]
except KeyError:
for argname in hook_impl.argnames:
if argname not in all_kwargs:
raise HookCallError(
"hook call must provide argument %r" % (argname,))
if hook_impl.hookwrapper:
return _wrapped_call(hook_impl.function(*args), self.execute)
> res = hook_impl.function(*args)
env/local/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py:614:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
session = <Session 'aredis'>
def pytest_runtestloop(session):
if (session.testsfailed and
not session.config.option.continue_on_collection_errors):
raise session.Interrupted(
"%d errors during collection" % session.testsfailed)
if session.config.option.collectonly:
return True
for i, item in enumerate(session.items):
nextitem = session.items[i+1] if i+1 < len(session.items) else None
> item.config.hook.pytest_runtest_protocol(item=item, nextitem=nextitem)
env/local/lib/python3.5/site-packages/_pytest/main.py:154:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <_HookCaller 'pytest_runtest_protocol'>
kwargs = {'__multicall__': <_MultiCall 0 results, 0 meths, kwargs={'nextitem': None, '__multicall__': <_MultiCall 0 results, 0 ...tem': <Function 'test_channel_message_handler'>}>, 'item': <Function 'test_channel_message_handler'>, 'nextitem': None}
def __call__(self, **kwargs):
assert not self.is_historic()
> return self._hookexec(self, self._nonwrappers + self._wrappers, kwargs)
env/local/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py:745:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <_pytest.config.PytestPluginManager object at 0x7ffaece772b0>, hook = <_HookCaller 'pytest_runtest_protocol'>, methods = []
kwargs = {'__multicall__': <_MultiCall 0 results, 0 meths, kwargs={'nextitem': None, '__multicall__': <_MultiCall 0 results, 0 ...tem': <Function 'test_channel_message_handler'>}>, 'item': <Function 'test_channel_message_handler'>, 'nextitem': None}
def _hookexec(self, hook, methods, kwargs):
# called from all hookcaller instances.
# enable_tracing will set its own wrapping function at self._inner_hookexec
> return self._inner_hookexec(hook, methods, kwargs)
env/local/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py:339:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
hook = <_HookCaller 'pytest_runtest_protocol'>, methods = []
kwargs = {'__multicall__': <_MultiCall 0 results, 0 meths, kwargs={'nextitem': None, '__multicall__': <_MultiCall 0 results, 0 ...tem': <Function 'test_channel_message_handler'>}>, 'item': <Function 'test_channel_message_handler'>, 'nextitem': None}
self._inner_hookexec = lambda hook, methods, kwargs: \
> _MultiCall(methods, kwargs, hook.spec_opts).execute()
env/local/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py:334:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <_MultiCall 0 results, 0 meths, kwargs={'nextitem': None, '__multicall__': <_MultiCall 0 results, 0 meths, kwargs={...}>, 'item': <Function 'test_channel_message_handler'>}>
def execute(self):
all_kwargs = self.kwargs
self.results = results = []
firstresult = self.specopts.get("firstresult")
while self.hook_impls:
hook_impl = self.hook_impls.pop()
try:
args = [all_kwargs[argname] for argname in hook_impl.argnames]
except KeyError:
for argname in hook_impl.argnames:
if argname not in all_kwargs:
raise HookCallError(
"hook call must provide argument %r" % (argname,))
if hook_impl.hookwrapper:
> return _wrapped_call(hook_impl.function(*args), self.execute)
env/local/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py:613:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
wrap_controller = <generator object pytest_runtest_protocol at 0x7ffae90384c0>
func = <bound method _MultiCall.execute of <_MultiCall 0 results, 0 meths, kwargs={'nextitem': None, '__multicall__': <_MultiCall 0 results, 0 meths, kwargs={...}>, 'item': <Function 'test_channel_message_handler'>}>>
def _wrapped_call(wrap_controller, func):
""" Wrap calling to a function with a generator which needs to yield
exactly once. The yield point will trigger calling the wrapped function
and return its _CallOutcome to the yield point. The generator then needs
to finish (raise StopIteration) in order for the wrapped call to complete.
"""
try:
next(wrap_controller) # first yield
except StopIteration:
_raise_wrapfail(wrap_controller, "did not yield")
call_outcome = _CallOutcome(func)
try:
wrap_controller.send(call_outcome)
_raise_wrapfail(wrap_controller, "has second yield")
except StopIteration:
pass
> return call_outcome.get_result()
env/local/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py:254:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <_pytest.vendored_packages.pluggy._CallOutcome object at 0x7ffae8dcddd8>
def get_result(self):
if self.excinfo is None:
return self.result
else:
ex = self.excinfo
if _py3:
> raise ex[1].with_traceback(ex[2])
env/local/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py:279:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <_pytest.vendored_packages.pluggy._CallOutcome object at 0x7ffae8dcddd8>
func = <bound method _MultiCall.execute of <_MultiCall 0 results, 0 meths, kwargs={'nextitem': None, '__multicall__': <_MultiCall 0 results, 0 meths, kwargs={...}>, 'item': <Function 'test_channel_message_handler'>}>>
def __init__(self, func):
try:
> self.result = func()
env/local/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py:265:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <_MultiCall 0 results, 0 meths, kwargs={'nextitem': None, '__multicall__': <_MultiCall 0 results, 0 meths, kwargs={...}>, 'item': <Function 'test_channel_message_handler'>}>
def execute(self):
all_kwargs = self.kwargs
self.results = results = []
firstresult = self.specopts.get("firstresult")
while self.hook_impls:
hook_impl = self.hook_impls.pop()
try:
args = [all_kwargs[argname] for argname in hook_impl.argnames]
except KeyError:
for argname in hook_impl.argnames:
if argname not in all_kwargs:
raise HookCallError(
"hook call must provide argument %r" % (argname,))
if hook_impl.hookwrapper:
return _wrapped_call(hook_impl.function(*args), self.execute)
> res = hook_impl.function(*args)
env/local/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py:614:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
item = <Function 'test_channel_message_handler'>, nextitem = None
def pytest_runtest_protocol(item, nextitem):
item.ihook.pytest_runtest_logstart(
nodeid=item.nodeid, location=item.location,
)
> runtestprotocol(item, nextitem=nextitem)
env/local/lib/python3.5/site-packages/_pytest/runner.py:66:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
item = <Function 'test_channel_message_handler'>, log = True, nextitem = None
def runtestprotocol(item, log=True, nextitem=None):
hasrequest = hasattr(item, "_request")
if hasrequest and not item._request:
item._initrequest()
rep = call_and_report(item, "setup", log)
reports = [rep]
if rep.passed:
if item.config.option.setupshow:
show_test_item(item)
if not item.config.option.setuponly:
> reports.append(call_and_report(item, "call", log))
env/local/lib/python3.5/site-packages/_pytest/runner.py:79:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
item = <Function 'test_channel_message_handler'>, when = 'call', log = True, kwds = {}
def call_and_report(item, when, log=True, **kwds):
> call = call_runtest_hook(item, when, **kwds)
env/local/lib/python3.5/site-packages/_pytest/runner.py:133:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
item = <Function 'test_channel_message_handler'>, when = 'call', kwds = {}, hookname = 'pytest_runtest_call'
def call_runtest_hook(item, when, **kwds):
hookname = "pytest_runtest_" + when
ihook = getattr(item.ihook, hookname)
> return CallInfo(lambda: ihook(item=item, **kwds), when=when)
env/local/lib/python3.5/site-packages/_pytest/runner.py:151:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <[AttributeError("'CallInfo' object has no attribute 'result'") raised in repr()] CallInfo object at 0x7ffae8dd4128>
func = <function call_runtest_hook.<locals>.<lambda> at 0x7ffae905dae8>, when = 'call'
def __init__(self, func, when):
#: context of invocation: one of "setup", "call",
#: "teardown", "memocollect"
self.when = when
self.start = time()
try:
> self.result = func()
env/local/lib/python3.5/site-packages/_pytest/runner.py:163:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
> return CallInfo(lambda: ihook(item=item, **kwds), when=when)
env/local/lib/python3.5/site-packages/_pytest/runner.py:151:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <_HookCaller 'pytest_runtest_call'>
kwargs = {'__multicall__': <_MultiCall 0 results, 0 meths, kwargs={'__multicall__': <_MultiCall 0 results, 0 meths, kwargs={...}>, 'item': <Function 'test_channel_message_handler'>}>, 'item': <Function 'test_channel_message_handler'>}
def __call__(self, **kwargs):
assert not self.is_historic()
> return self._hookexec(self, self._nonwrappers + self._wrappers, kwargs)
env/local/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py:745:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <_pytest.config.PytestPluginManager object at 0x7ffaece772b0>, hook = <_HookCaller 'pytest_runtest_call'>, methods = []
kwargs = {'__multicall__': <_MultiCall 0 results, 0 meths, kwargs={'__multicall__': <_MultiCall 0 results, 0 meths, kwargs={...}>, 'item': <Function 'test_channel_message_handler'>}>, 'item': <Function 'test_channel_message_handler'>}
def _hookexec(self, hook, methods, kwargs):
# called from all hookcaller instances.
# enable_tracing will set its own wrapping function at self._inner_hookexec
> return self._inner_hookexec(hook, methods, kwargs)
env/local/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py:339:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
hook = <_HookCaller 'pytest_runtest_call'>, methods = []
kwargs = {'__multicall__': <_MultiCall 0 results, 0 meths, kwargs={'__multicall__': <_MultiCall 0 results, 0 meths, kwargs={...}>, 'item': <Function 'test_channel_message_handler'>}>, 'item': <Function 'test_channel_message_handler'>}
self._inner_hookexec = lambda hook, methods, kwargs: \
> _MultiCall(methods, kwargs, hook.spec_opts).execute()
env/local/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py:334:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <_MultiCall 0 results, 0 meths, kwargs={'__multicall__': <_MultiCall 0 results, 0 meths, kwargs={...}>, 'item': <Function 'test_channel_message_handler'>}>
def execute(self):
all_kwargs = self.kwargs
self.results = results = []
firstresult = self.specopts.get("firstresult")
while self.hook_impls:
hook_impl = self.hook_impls.pop()
try:
args = [all_kwargs[argname] for argname in hook_impl.argnames]
except KeyError:
for argname in hook_impl.argnames:
if argname not in all_kwargs:
raise HookCallError(
"hook call must provide argument %r" % (argname,))
if hook_impl.hookwrapper:
> return _wrapped_call(hook_impl.function(*args), self.execute)
env/local/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py:613:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
wrap_controller = <generator object pytest_runtest_call at 0x7ffaea4b4f10>
func = <bound method _MultiCall.execute of <_MultiCall 0 results, 0 meths, kwargs={'__multicall__': <_MultiCall 0 results, 0 meths, kwargs={...}>, 'item': <Function 'test_channel_message_handler'>}>>
def _wrapped_call(wrap_controller, func):
""" Wrap calling to a function with a generator which needs to yield
exactly once. The yield point will trigger calling the wrapped function
and return its _CallOutcome to the yield point. The generator then needs
to finish (raise StopIteration) in order for the wrapped call to complete.
"""
try:
next(wrap_controller) # first yield
except StopIteration:
_raise_wrapfail(wrap_controller, "did not yield")
call_outcome = _CallOutcome(func)
try:
wrap_controller.send(call_outcome)
_raise_wrapfail(wrap_controller, "has second yield")
except StopIteration:
pass
> return call_outcome.get_result()
env/local/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py:254:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <_pytest.vendored_packages.pluggy._CallOutcome object at 0x7ffae8dd4240>
def get_result(self):
if self.excinfo is None:
return self.result
else:
ex = self.excinfo
if _py3:
> raise ex[1].with_traceback(ex[2])
env/local/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py:279:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <_pytest.vendored_packages.pluggy._CallOutcome object at 0x7ffae8dd4240>
func = <bound method _MultiCall.execute of <_MultiCall 0 results, 0 meths, kwargs={'__multicall__': <_MultiCall 0 results, 0 meths, kwargs={...}>, 'item': <Function 'test_channel_message_handler'>}>>
def __init__(self, func):
try:
> self.result = func()
env/local/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py:265:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <_MultiCall 0 results, 0 meths, kwargs={'__multicall__': <_MultiCall 0 results, 0 meths, kwargs={...}>, 'item': <Function 'test_channel_message_handler'>}>
def execute(self):
all_kwargs = self.kwargs
self.results = results = []
firstresult = self.specopts.get("firstresult")
while self.hook_impls:
hook_impl = self.hook_impls.pop()
try:
args = [all_kwargs[argname] for argname in hook_impl.argnames]
except KeyError:
for argname in hook_impl.argnames:
if argname not in all_kwargs:
raise HookCallError(
"hook call must provide argument %r" % (argname,))
if hook_impl.hookwrapper:
return _wrapped_call(hook_impl.function(*args), self.execute)
> res = hook_impl.function(*args)
env/local/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py:614:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
item = <Function 'test_channel_message_handler'>
def pytest_runtest_call(item):
try:
> item.runtest()
env/local/lib/python3.5/site-packages/_pytest/runner.py:104:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Function 'test_channel_message_handler'>
def runtest(self):
""" execute the underlying test function. """
> self.ihook.pytest_pyfunc_call(pyfuncitem=self)
env/local/lib/python3.5/site-packages/_pytest/python.py:1574:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <_HookCaller 'pytest_pyfunc_call'>
kwargs = {'__multicall__': <_MultiCall 0 results, 1 meths, kwargs={'pyfuncitem': <Function 'test_channel_message_handler'>, '__multicall__': <_MultiCall 0 results, 1 meths, kwargs={...}>}>, 'pyfuncitem': <Function 'test_channel_message_handler'>}
def __call__(self, **kwargs):
assert not self.is_historic()
> return self._hookexec(self, self._nonwrappers + self._wrappers, kwargs)
env/local/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py:745:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <_pytest.config.PytestPluginManager object at 0x7ffaece772b0>, hook = <_HookCaller 'pytest_pyfunc_call'>
methods = [<_pytest.vendored_packages.pluggy.HookImpl object at 0x7ffaeb4b8e10>]
kwargs = {'__multicall__': <_MultiCall 0 results, 1 meths, kwargs={'pyfuncitem': <Function 'test_channel_message_handler'>, '__multicall__': <_MultiCall 0 results, 1 meths, kwargs={...}>}>, 'pyfuncitem': <Function 'test_channel_message_handler'>}
def _hookexec(self, hook, methods, kwargs):
# called from all hookcaller instances.
# enable_tracing will set its own wrapping function at self._inner_hookexec
> return self._inner_hookexec(hook, methods, kwargs)
env/local/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py:339:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
hook = <_HookCaller 'pytest_pyfunc_call'>, methods = [<_pytest.vendored_packages.pluggy.HookImpl object at 0x7ffaeb4b8e10>]
kwargs = {'__multicall__': <_MultiCall 0 results, 1 meths, kwargs={'pyfuncitem': <Function 'test_channel_message_handler'>, '__multicall__': <_MultiCall 0 results, 1 meths, kwargs={...}>}>, 'pyfuncitem': <Function 'test_channel_message_handler'>}
self._inner_hookexec = lambda hook, methods, kwargs: \
> _MultiCall(methods, kwargs, hook.spec_opts).execute()
env/local/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py:334:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <_MultiCall 0 results, 1 meths, kwargs={'pyfuncitem': <Function 'test_channel_message_handler'>, '__multicall__': <_MultiCall 0 results, 1 meths, kwargs={...}>}>
def execute(self):
all_kwargs = self.kwargs
self.results = results = []
firstresult = self.specopts.get("firstresult")
while self.hook_impls:
hook_impl = self.hook_impls.pop()
try:
args = [all_kwargs[argname] for argname in hook_impl.argnames]
except KeyError:
for argname in hook_impl.argnames:
if argname not in all_kwargs:
raise HookCallError(
"hook call must provide argument %r" % (argname,))
if hook_impl.hookwrapper:
> return _wrapped_call(hook_impl.function(*args), self.execute)
env/local/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py:613:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
wrap_controller = <generator object pytest_pyfunc_call at 0x7ffae9046e60>
func = <bound method _MultiCall.execute of <_MultiCall 0 results, 1 meths, kwargs={'pyfuncitem': <Function 'test_channel_message_handler'>, '__multicall__': <_MultiCall 0 results, 1 meths, kwargs={...}>}>>
def _wrapped_call(wrap_controller, func):
""" Wrap calling to a function with a generator which needs to yield
exactly once. The yield point will trigger calling the wrapped function
and return its _CallOutcome to the yield point. The generator then needs
to finish (raise StopIteration) in order for the wrapped call to complete.
"""
try:
next(wrap_controller) # first yield
except StopIteration:
_raise_wrapfail(wrap_controller, "did not yield")
call_outcome = _CallOutcome(func)
try:
wrap_controller.send(call_outcome)
_raise_wrapfail(wrap_controller, "has second yield")
except StopIteration:
pass
> return call_outcome.get_result()
env/local/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py:254:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <_pytest.vendored_packages.pluggy._CallOutcome object at 0x7ffae8dd42b0>
def get_result(self):
if self.excinfo is None:
return self.result
else:
ex = self.excinfo
if _py3:
> raise ex[1].with_traceback(ex[2])
env/local/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py:279:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <_pytest.vendored_packages.pluggy._CallOutcome object at 0x7ffae8dd42b0>
func = <bound method _MultiCall.execute of <_MultiCall 0 results, 1 meths, kwargs={'pyfuncitem': <Function 'test_channel_message_handler'>, '__multicall__': <_MultiCall 0 results, 1 meths, kwargs={...}>}>>
def __init__(self, func):
try:
> self.result = func()
env/local/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py:265:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <_MultiCall 0 results, 1 meths, kwargs={'pyfuncitem': <Function 'test_channel_message_handler'>, '__multicall__': <_MultiCall 0 results, 1 meths, kwargs={...}>}>
def execute(self):
all_kwargs = self.kwargs
self.results = results = []
firstresult = self.specopts.get("firstresult")
while self.hook_impls:
hook_impl = self.hook_impls.pop()
try:
args = [all_kwargs[argname] for argname in hook_impl.argnames]
except KeyError:
for argname in hook_impl.argnames:
if argname not in all_kwargs:
raise HookCallError(
"hook call must provide argument %r" % (argname,))
if hook_impl.hookwrapper:
return _wrapped_call(hook_impl.function(*args), self.execute)
> res = hook_impl.function(*args)
env/local/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py:614:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
pyfuncitem = <Function 'test_channel_message_handler'>
@pytest.mark.tryfirst
def pytest_pyfunc_call(pyfuncitem):
"""
Run asyncio marked test functions in an event loop instead of a normal
function call.
"""
for marker_name, fixture_name in _markers_2_fixtures.items():
if marker_name in pyfuncitem.keywords:
event_loop = pyfuncitem.funcargs[fixture_name]
funcargs = pyfuncitem.funcargs
testargs = {arg: funcargs[arg]
for arg in pyfuncitem._fixtureinfo.argnames}
event_loop.run_until_complete(
> asyncio.async(pyfuncitem.obj(**testargs), loop=event_loop))
env/local/lib/python3.5/site-packages/pytest_asyncio/plugin.py:77:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <_UnixSelectorEventLoop running=False closed=False debug=False>
future = <Task pending coro=<test_channel_message_handler() running at /home/cm/work/aredis/tests/test_pubsub.py:277> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_run_until_complete_cb() at /usr/lib/python3.5/asyncio/base_events.py:164]>
def run_until_complete(self, future):
"""Run until the Future is done.
If the argument is a coroutine, it is wrapped in a Task.
WARNING: It would be disastrous to call run_until_complete()
with the same coroutine twice -- it would wrap it in two
different Tasks and that can't be good.
Return the Future's result, or raise its exception.
"""
self._check_closed()
new_task = not isinstance(future, futures.Future)
future = tasks.ensure_future(future, loop=self)
if new_task:
# An exception is raised if the future didn't complete, so there
# is no need to log the "destroy pending task" message
future._log_destroy_pending = False
future.add_done_callback(_run_until_complete_cb)
try:
> self.run_forever()
/usr/lib/python3.5/asyncio/base_events.py:375:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <_UnixSelectorEventLoop running=False closed=False debug=False>
def run_forever(self):
"""Run until stop() is called."""
self._check_closed()
if self.is_running():
raise RuntimeError('Event loop is running.')
self._set_coroutine_wrapper(self._debug)
self._thread_id = threading.get_ident()
try:
while True:
> self._run_once()
/usr/lib/python3.5/asyncio/base_events.py:345:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <_UnixSelectorEventLoop running=False closed=False debug=False>
def _run_once(self):
"""Run one full iteration of the event loop.
This calls all currently ready callbacks, polls for I/O,
schedules the resulting callbacks, and finally schedules
'call_later' callbacks.
"""
sched_count = len(self._scheduled)
if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
self._timer_cancelled_count / sched_count >
_MIN_CANCELLED_TIMER_HANDLES_FRACTION):
# Remove delayed calls that were cancelled if their number
# is too high
new_scheduled = []
for handle in self._scheduled:
if handle._cancelled:
handle._scheduled = False
else:
new_scheduled.append(handle)
heapq.heapify(new_scheduled)
self._scheduled = new_scheduled
self._timer_cancelled_count = 0
else:
# Remove delayed calls that were cancelled from head of queue.
while self._scheduled and self._scheduled[0]._cancelled:
self._timer_cancelled_count -= 1
handle = heapq.heappop(self._scheduled)
handle._scheduled = False
timeout = None
if self._ready or self._stopping:
timeout = 0
elif self._scheduled:
# Compute the desired timeout.
when = self._scheduled[0]._when
timeout = max(0, when - self.time())
if self._debug and timeout != 0:
t0 = self.time()
event_list = self._selector.select(timeout)
dt = self.time() - t0
if dt >= 1.0:
level = logging.INFO
else:
level = logging.DEBUG
nevent = len(event_list)
if timeout is None:
logger.log(level, 'poll took %.3f ms: %s events',
dt * 1e3, nevent)
elif nevent:
logger.log(level,
'poll %.3f ms took %.3f ms: %s events',
timeout * 1e3, dt * 1e3, nevent)
elif dt >= 1.0:
logger.log(level,
'poll %.3f ms took %.3f ms: timeout',
timeout * 1e3, dt * 1e3)
else:
> event_list = self._selector.select(timeout)
/usr/lib/python3.5/asyncio/base_events.py:1276:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <selectors.EpollSelector object at 0x7ffae8dd41d0>, timeout = -1
def select(self, timeout=None):
if timeout is None:
timeout = -1
elif timeout <= 0:
timeout = 0
else:
# epoll_wait() has a resolution of 1 millisecond, round away
# from zero to wait *at least* timeout seconds.
timeout = math.ceil(timeout * 1e3) * 1e-3
# epoll_wait() expects `maxevents` to be greater than zero;
# we want to make sure that `select()` can be called when no
# FD is registered.
max_ev = max(len(self._fd_to_key), 1)
ready = []
try:
> fd_event_list = self._epoll.poll(timeout, max_ev)
E KeyboardInterrupt
/usr/lib/python3.5/selectors.py:441: KeyboardInterrupt
This came up when I was writing tests for some code that used asyncpg
.
Essentially, if a pytest fixture uses the @pytest.mark.asyncio
decorator or just directly calls the event loop from inside a synchronous function, the async tests which require that fixture will pass regardless of their content.
I've put together a minimal example here https://github.com/cshenton/pytest-asyncio-bug that only uses the standard library and pytest
.
There are async tests, each of which awaits asyncio.sleep()
then calls assert False
. They all depend on a fixture which also calls asyncio.sleep()
, in different ways, and they all pass on my machine.
I am getting a ton of these warnings during test runs:
.tox/py36/lib/python3.6/site-packages/pytest_asyncio/plugin.py:77: DeprecationWarning: asyncio.async() function is deprecated, use ensure_future()
asyncio.async(pyfuncitem.obj(**testargs), loop=event_loop))
Environment:
I am having problems testing a coroutine. The coroutine is called within a asyncio.gather call and is a method within a class. The code is functional and runs as expected, but I can't seem to figure out how to create a working test case using pytest-asyncio.
Here is the coroutine code:
@asyncio.coroutine
def normalize_button_value(self, data, position):
"""
Button data is returned from the Esplora in negative logic format.
This method corrects this so that a button press is a "1" and a release is "0"
:param data: A list of sensor data elements
:param position: Position in the list for this button
:return: Corrected button data
"""
if data[position] == '0':
return '1'
else:
return '0'
My test code is a method in a test class that inherits from TestCase:
@pytest.mark.asyncio
def test_normalize_button_value(self, event_loop):
my_serial = EsploraSerial("/dev/ttyACM0", self.q, self.cond)
resp = event_loop.run_until_complete(my_serial.normalize_button_value(self.list_test_data, 1))
self.assertEquals(resp, '0')
The test fails and here is the output from the test:
self = <unittest.case._Outcome object at 0xb4b883ec>
test_case = <test.test_esploraSerial.TestEsploraSerial testMethod=test_normalize_button_value>
isTest = True
@contextlib.contextmanager
def testPartExecutor(self, test_case, isTest=False):
old_success = self.success
self.success = True
try:
> yield
/usr/local/lib/python3.4/unittest/case.py:58:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <test.test_esploraSerial.TestEsploraSerial testMethod=test_normalize_button_value>
result = <TestCaseFunction 'test_normalize_button_value'>
def run(self, result=None):
orig_result = result
if result is None:
result = self.defaultTestResult()
startTestRun = getattr(result, 'startTestRun', None)
if startTestRun is not None:
startTestRun()
result.startTest(self)
testMethod = getattr(self, self._testMethodName)
if (getattr(self.__class__, "__unittest_skip__", False) or
getattr(testMethod, "__unittest_skip__", False)):
# If the class or method was skipped.
try:
skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
or getattr(testMethod, '__unittest_skip_why__', ''))
self._addSkip(result, self, skip_why)
finally:
result.stopTest(self)
return
expecting_failure = getattr(testMethod,
"__unittest_expecting_failure__", False)
outcome = _Outcome(result)
try:
self._outcome = outcome
with outcome.testPartExecutor(self):
self.setUp()
if outcome.success:
outcome.expecting_failure = expecting_failure
with outcome.testPartExecutor(self, isTest=True):
> testMethod()
E TypeError: test_normalize_button_value() missing 1 required positional argument: 'event_loop'
Could you please provide some guidance as to what I may be doing wrong?
Thanks
I got a ScopeMismatch error when tried to add fixture to my test fucntion in a class when I add the scope parameter (scope='class' or scope='module')
:
My case:
import asyncio
import pytest
import pytest_asyncio
from .database import DB
@pytest.fixture(scope='class')
async def db_setup(request):
print("\nconnect to db")
db = await DB.create()
async def resource_teardown():
await db.close()
print("\ndisconnect")
request.addfinalizer(resource_teardown)
return db
class TestDB:
@pytest.mark.asyncio
async def test_connection(self, event_loop, db_setup):
db = await db_setup
with await db._pool as redis:
res = await redis.ping()
print(res)
assert res, "PONG"
And when i run it i get :
ScopeMismatch: You tried to access the 'function' scoped fixture 'event_loop' with a 'class' scoped request object, involved factories
../../../../../.virtualenvs/jwt_auth/lib/python3.5/site-packages/pytest_asyncio/plugin.py:110: def wrapper(*args, **kwargs)
==================================== ERRORS ====================================
___________________ ERROR at setup of TestDB.test_connection ___________________
ScopeMismatch: You tried to access the 'function' scoped fixture 'event_loop' with a 'class' scoped request object, involved factories
../../../../../.virtualenvs/jwt_auth/lib/python3.5/site-packages/pytest_asyncio/plugin.py:110: def wrapper(*args, **kwargs)
============================== 5 tests deselected ==============================
==================== 5 deselected, 1 error in 0.17 seconds =====================
Process finished with exit code 0
I tried to delete the event_loop param but it gave the same error but when i removed scope
parameter from the fixture everithing works
I have a test suite that uses pytest-asyncio for some of its tests and it works pretty well. Thank you for creating such a useful plugin.
All of my coroutine test functions are declared with async
. To avoid marking every such function with @pytest.mark.asyncio
, in the root conftest.py
for that suite I have added the following pytest hook:
import pytest
import inspect
def pytest_collection_modifyitems(session, config, items):
for item in items:
if isinstance(item, pytest.Function) and inspect.iscoroutinefunction(item.function):
item.add_marker(pytest.mark.asyncio)
So far I have not managed to find any drawback to doing this sort of thing - the async
keyword indicates coroutine functions just as clearly as the decorator does, and this approach seems to correctly mark each async test.
Could something like this be a part of pytest-asyncio itself, or is there a rationale for not including such a feature?
I understand that this project existed before Python had async
/await
semantics; however, you appear to have dropped support for Python < 3.5 (#57), which means that every supported Python version also supports the async
keyword. Furthermore, I am not suggesting the removal or deprecation of the decorator approach since there are probably valid use cases for creating coroutines without using async
.
If this kind of feature would be appreciated, I can contribute the changes myself when I can find the time.
Hi there!
I'm porting some of my tests from v0.3.0 to v0.5.0 and I have a test that behaves strangely when I switch to v0.4.1 or later.
After some debugging, I noticed that id(event_loop)
inside the test and id(asyncio.get_event_loop())
return different results for versions after v0.4.1 and v0.5.0.
After adding a call to asyncio.set_event_loop(event_loop)
in my tests like this:
def test_serve(event_loop, cli):
asyncio.set_event_loop(event_loop) # wasn't required with v0.3.0.
Notice that I'm not using @pytest.mark.asyncio
here. I can't use it because I'm testing a click CLI that calls event_loop.run_until_complete()
internally.
I guess this is explained by the fact that the event_loop
fixture stopped calling asyncio.set_event_loop()
when the @pytest.mark.asyncio(forbid_global_event_loop=...)
syntax was introduced in v0.4.0.
In short, it seems like using only the event_loop
fixture (e.g. without @pytest.mark.asyncio
) leads to a surprising result. I thing that the (absence of a) relationship between the event_loop
fixture and asyncio.get_event_loop()
should be added to the documentation to make it more obvious. If you're OK with this, I can send in a PR to fix it.
In passing, I think there's a big quirk in the API here: the behaviour for using the event_loop
fixture without the asyncio
marker is surprising, to say to least. The havior in v0.3.0 was much better than it is now in that respect. This seems to be caused by the fact that forbid_global_event_loop
is marker parameter rather than a fixture. If the policy for controlling the relationship with the default event loop were a fixture, it would allow the event_loop
fixture to decide what to do with the default event loop.
I've seen @asvetlov's comments in issue #12 and PR #24. There seems to be some contention over whether returning exceptions from asyncio.get_event_loop()
should be the default or not. I don't have a specific position on this, but I am convinced that either it should return the same value as the event_loop
fixture or it should fail (having two event loops by default is not ideal). A neat side-effect of making the policy for the default event loop a fixture on which the event_loop
fixture depends (instead of a parameter to the marker) would be to allow defining a project-specific default: each project would be able to define this new fixture as something returning a boolean (or enum value) to choose the policy and use autouse
to make it a project specific default.
What are your thoughts on this?
Right now, this test passes:
async def do_something_broken():
assert False
@pytest.mark.asyncio
async def test_something_broken():
do_something_broken()
The reason is that we forgot the await
in test_something_broken
, so the broken code never actually ran. Oops. Python does issue a RuntimeWarning: coroutine 'do_something_broken' was never awaited
, and recent pytest will print this at the end of tests, but this has a few issues:
============================= test session starts ==============================
platform linux -- Python 3.5.3[pypy-5.8.0-beta], pytest-3.2.2, py-1.4.34, pluggy-0.4.0
rootdir: /tmp, inifile:
plugins: cov-2.5.1, catchlog-1.2.2, asyncio-0.7.0
collected 1 item
../../tmp/test.py .
=========================== 1 passed in 0.02 seconds ===========================
I'm considering proposing a new feature for Python 3.7, that would make it so pytest-asyncio could do:
# Ask Python to start maintaining a list of unawaited coroutines
sys.set_unawaited_coroutine_tracking(True)
try:
... run the test ...
finally:
# Get the unawaited coroutines
unawaited_coroutines = sys.get_and_clear_unawaited_coroutines()
sys.set_unawaited_coroutine_tracking(False)
if unawaited_coroutines:
# Issue an error that points to the actual problem
raise RuntimeError(f"Unawaited coroutines: {unawaited_coroutines}")
(Names etc. to be bikeshedded later; this is "API 2" in python-trio/trio#79 (comment))
This way you could deterministically detect unawaited coroutines, reliably attribute them to the correct test, and cause it to fail with a useful error message.
Is this an API that you'd want to take advantage of if it were available?
In asyncio world parameter for event loop is always called loop
, never event_loop
.
I think we should use the same name for fixture (with keeping backward compatibility, sure).
Also disabling global loop by asyncio.set_event_loop(None)
is useful feature.
I always run my tests in isolated environment.
Global loop may be an option (nondefault) for loop
fixture.
event_loop
should preserve existing global strategy but may accept option for disabling it (or we may keep it untouched encouraging people to use loop
fixture).
If you attempt to use pytest-asyncio
with pytest-xdist
, and run py.test -n 4
it will fail with the exception in the title at this line.
Running it without -n 4
works as expected. I'm not sure what's going on here or why that attribute would be missing.
I have a session scoped fixture that runs a Postgres database in the background.
I'd like the connection pool to that database to last for the entire session for the entire session as well.
This fails because the event_loop fixture is scoped to a function.
Is there a workaround for this problem?
File "/home/travis/build/mosquito/aio-pika/.tox/py34/lib/python3.4/site-packages/pkg_resources/__init__.py", line 2308, in resolve
module = __import__(self.module_name, fromlist=['__name__'], level=0)
File "/home/travis/build/mosquito/aio-pika/.tox/py34/lib/python3.4/site-packages/pytest_asyncio/plugin.py", line 81
async def setup():
Python 3.6 have introduced new async generator entity:
async def foo():
yield 'Foo'
When trying to create yield fixture in pytest (or a simple fixture but with async def
) and decorate it with @pytest.mark.asyncio
i receive a coroutine
instead of fixture result.
Is there a way to make pytest.mark.asyncio
recognize fixtures as coroutine functions or async generator functions and behave properly on them?
Consider this:
import pytest
async def foo():
print("foo")
@pytest.yield_fixture
def my_fixture(event_loop):
print(event_loop.is_closed()) # "False"
event_loop.run_until_complete(foo()) # "foo"
yield None
print(event_loop.is_closed()) # "True"... noooo!
event_loop.run_until_complete(foo()) # goes boom
@pytest.mark.asyncio
async def test_foo(my_fixture):
await foo()
Run this code with py.test -s <file>
and you'll see the output as indicated by the comments in the code.
The event loop is closed before my finalizer code can run. This is also the case when I write my fixture without a yield, instead adding a finalizer function.
This is weird. I'm pretty sure that this worked just fine a couple of months ago.
I just added the forbid_global_loop
parameter to the asyncio
marker. Most tests pass fine, I fixed a few others with accidental references to the default event loop. However, some of them fail in a strange way:
asyncio/base_events.py:342: in run_until_complete
return future.result()
asyncio/futures.py:274: in result
raise self._exception
asyncio/tasks.py:239: in _step
result = coro.send(value)
gitmesh/storage.py:35: in check_output
loop=loop,
asyncio/subprocess.py:197: in create_subprocess_shell
stderr=stderr, **kwds)
asyncio/base_events.py:940: in subprocess_shell
protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
asyncio/unix_events.py:179: in _make_subprocess_transport
with events.get_child_watcher() as watcher:
asyncio/events.py:604: in get_child_watcher
return get_event_loop_policy().get_child_watcher()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <pytest_asyncio.plugin.ForbiddenEventLoopPolicy object at 0x103dbba20>
def get_child_watcher(self):
"Get the watcher for child processes."
> raise NotImplementedError
E NotImplementedError
Seems like, at least on OSX, the asyncio
subprocess facilities depend on an undocumented get_child_watcher()
method of the event loop policy.
I'm guessing part of this problem exceeds the scope of this project: custom event loops must conform to an undocumented interface to be able to support subprocesses. I'm guessing this needs to be discussed with asyncio maintainers.
However, I guess you'll also want to do something about this in this library because pytest_asyncio.plugin.ForbiddenEventLoopPolicy
currently doesn't implement this hidden interface.
I have a very simple app called "myapp". It uses the AsyncElasticsearch client:
from elasticsearch_async import AsyncElasticsearch
def create_app():
app = dict()
app['es_client'] = AsyncElasticsearch('http://index:9200/')
app['stuff'] = Stuff(app['es_client'])
return app
class Stuff:
def __init__(self, es_client):
self.es_client = es_client
def do_async_stuff(self):
return self.es_client.index(index='test',
doc_type='test',
body={'field': 'sample content'})
My question is not about AsyncElasticsearch, it just happens to be an async thing I want to work with, could be sth else like a Mongo driver or whatever.
I want to test do_async_stuff()
and wrote the following conftest.py
import pytest
from myapp import create_app
@pytest.fixture(scope='session')
def app():
return create_app()
... and test_stuff.py
import pytest
@pytest.mark.asyncio
async def test_stuff(app):
await app['stuff'].do_async_stuff()
assert True
When I execute the test I get an exception with the message "attached to a different loop". Digging into that matter I found that pytest-asyncio creates a new event_loop for each test case (right?). The Elasticsearch client however, takes the default loop on instantiation and sticks with it. So I tried to convince it to use the pytest-asyncio event_loop like so:
import pytest
@pytest.mark.asyncio
async def test_stuff(app, event_loop):
app['es_client'].transport.loop = event_loop
await app['stuff'].do_async_stuff()
assert True
This however gives me another exception:
__________________________________ test_stuff __________________________________
app = {'es_client': <Elasticsearch([{'host': 'index', 'port': 9200, 'scheme': 'http'}])>, 'stuff': <myapp.Stuff object at 0x7ffbbaff1860>}
@pytest.mark.asyncio
async def test_stuff(app):
> await app['stuff'].do_async_stuff()
test/test_stuff.py:6:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Task pending coro=<AsyncTransport.main_loop() running at /usr/local/lib/python3.5/dist-packages/elasticsearch_async/transport.py:133>>
def __iter__(self):
if not self.done():
self._blocking = True
> yield self # This tells Task to wait for completion.
E RuntimeError: Task <Task pending coro=<test_stuff() running at /srv/app/backend/test/test_stuff.py:6> cb=[_run_until_complete_cb() at /usr/lib/python3.5/asyncio/base_events.py:164]> got Future <Task pending coro=<AsyncTransport.main_loop() running at /usr/local/lib/python3.5/dist-packages/elasticsearch_async/transport.py:133>> attached to a different loop
How am I supposed to test this scenario?
Hi,
I'm not sure this is the purpose of this library but I want to run pytest tests asynchronously.
Consider this example:
import asyncio
import pytest
@pytest.mark.asyncio
async def test_1():
await asyncio.sleep(2)
@pytest.mark.asyncio
async def test_2():
await asyncio.sleep(2)
$ py.test -q
..
2 passed in 4.01 seconds
It would be nice to run the test suite in ~2 seconds instead of 4. Is it currently possible with pytest-asyncio or with another library ? I guess we would need to asyncio.gather()
all async tests and run them in the same event loop.
Thanks !
Does mock monkey patching work with pytest-dev?
I have 3 similar tests that allow me to stub out an instantiation of a pyserial object. The mock stub works in all 3 tests I have included below. However, I am also trying to monkeypatch a call to the get() method on a queue.
The first test succeeds, but I am doing a direct call to get and not yield from .
The second test is essentially the same as the first, but I use a yield from.
The third test just changes the return statement to a yield in the mockreturn function.
Below are the 3 test cases and the outputs from the failures.
Thanks,
Alan
import asyncio
import pytest
import serial
from esplora_serial import EsploraSerial
def test_some_interaction(monkeypatch, mocker):
# create a queue
q = asyncio.LifoQueue(maxsize=1)
# stub out the call to serial.Serial in the EsploraSerial class __init__
mocker.patch('serial.Serial')
# instantiate the Esplora Serial class
my_serial = EsploraSerial("/dev/ttyACM0", q)
#create a mock return for the queue "get"
def mockreturn(get):
return '/abc'
# monkey patch the lifo get to return "/abc"
monkeypatch.setattr(asyncio.LifoQueue, 'get', mockreturn)
resp = q.get()
assert resp == "/abc"
@pytest.mark.asyncio
def test_some_interaction2(monkeypatch, mocker):
# create a queue
q = asyncio.LifoQueue(maxsize=1)
# stub out the call to serial.Serial in the EsploraSerial class __init__
mocker.patch('serial.Serial')
# instantiate the Esplora Serial class
my_serial = EsploraSerial("/dev/ttyACM0", q)
#create a mock return for the queue "get"
def mockreturn(get):
return '/abc'
# monkey patch the lifo get to return "/abc"
monkeypatch.setattr(asyncio.LifoQueue, 'get', mockreturn)
resp = yield from q.get()
assert resp == "/abc"
@pytest.mark.asyncio
def test_some_interaction3(monkeypatch, mocker):
# create a queue
q = asyncio.LifoQueue(maxsize=1)
# stub out the call to serial.Serial in the EsploraSerial class __init__
mocker.patch('serial.Serial')
# instantiate the Esplora Serial class
my_serial = EsploraSerial("/dev/ttyACM0", q)
#create a mock return for the queue "get"
def mockreturn(get):
yield '/abc'
# monkey patch the lifo get to return "/abc"
monkeypatch.setattr(asyncio.LifoQueue, 'get', mockreturn)
resp = yield from q.get()
assert resp == "/abc"
And now the test output:
/usr/local/bin/python3 /home/afy/PycharmProjects/pyvmmonitor/public_api/pyvmmonitor/__init__.py --profile=lsprof --spawn-ui=false /home/afy/pycharm-4.0.4/helpers/pycharm/pytestrunner.py -p pytest_teamcity /home/afy/PycharmProjects/esp4s-aio/test/test_esp4sHttpServer.py
Testing started at 7:26 PM ...
Patching args: ['/bin/sh', '-c', 'uname -p 2> /dev/null']
Process is not python, returning.
============================= test session starts ==============================
platform linux -- Python 3.4.3 -- py-1.4.26 -- pytest-2.7.0
rootdir: /home/afy/PycharmProjects/esp4s-aio/test, inifile:
plugins: asyncio, mock
collected 3 items
PycharmProjects/esp4s-aio/test/test_esp4sHttpServer.py .F
monkeypatch = <_pytest.monkeypatch.monkeypatch object at 0xb4b146cc>
mocker = <pytest_mock.MockFixture object at 0xb4b1478c>
@pytest.mark.asyncio
def test_some_interaction2(monkeypatch, mocker):
# create a queue
q = asyncio.LifoQueue(maxsize=1)
# stub out the call to serial.Serial in the EsploraSerial class __init__
mocker.patch('serial.Serial')
# instantiate the Esplora Serial class
my_serial = EsploraSerial("/dev/ttyACM0", q)
#create a mock return for the queue "get"
def mockreturn(get):
return '/abc'
# monkey patch the lifo get to return "/abc"
monkeypatch.setattr(asyncio.LifoQueue, 'get', mockreturn)
> resp = yield from q.get()
E RuntimeError: Task got bad yield: '/'
PycharmProjects/esp4s-aio/test/test_esp4sHttpServer.py:115: RuntimeError
F
monkeypatch = <_pytest.monkeypatch.monkeypatch object at 0xb4b6156c>
mocker = <pytest_mock.MockFixture object at 0xb4b616cc>
@pytest.mark.asyncio
def test_some_interaction3(monkeypatch, mocker):
# create a queue
q = asyncio.LifoQueue(maxsize=1)
# stub out the call to serial.Serial in the EsploraSerial class __init__
mocker.patch('serial.Serial')
# instantiate the Esplora Serial class
my_serial = EsploraSerial("/dev/ttyACM0", q)
#create a mock return for the queue "get"
def mockreturn(get):
yield '/abc'
# monkey patch the lifo get to return "/abc"
monkeypatch.setattr(asyncio.LifoQueue, 'get', mockreturn)
> resp = yield from q.get()
PycharmProjects/esp4s-aio/test/test_esp4sHttpServer.py:136:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
get = <LifoQueue at 0xb4b615ac maxsize=1>
def mockreturn(get):
> yield '/abc'
E RuntimeError: Task got bad yield: '/abc'
PycharmProjects/esp4s-aio/test/test_esp4sHttpServer.py:131: RuntimeError
=================================== FAILURES ===================================
____________________________ test_some_interaction2 ____________________________
monkeypatch = <_pytest.monkeypatch.monkeypatch object at 0xb4b146cc>
mocker = <pytest_mock.MockFixture object at 0xb4b1478c>
@pytest.mark.asyncio
def test_some_interaction2(monkeypatch, mocker):
# create a queue
q = asyncio.LifoQueue(maxsize=1)
# stub out the call to serial.Serial in the EsploraSerial class __init__
mocker.patch('serial.Serial')
# instantiate the Esplora Serial class
my_serial = EsploraSerial("/dev/ttyACM0", q)
#create a mock return for the queue "get"
def mockreturn(get):
return '/abc'
# monkey patch the lifo get to return "/abc"
monkeypatch.setattr(asyncio.LifoQueue, 'get', mockreturn)
> resp = yield from q.get()
E RuntimeError: Task got bad yield: '/'
PycharmProjects/esp4s-aio/test/test_esp4sHttpServer.py:115: RuntimeError
____________________________ test_some_interaction3 ____________________________
monkeypatch = <_pytest.monkeypatch.monkeypatch object at 0xb4b6156c>
mocker = <pytest_mock.MockFixture object at 0xb4b616cc>
@pytest.mark.asyncio
def test_some_interaction3(monkeypatch, mocker):
# create a queue
q = asyncio.LifoQueue(maxsize=1)
# stub out the call to serial.Serial in the EsploraSerial class __init__
mocker.patch('serial.Serial')
# instantiate the Esplora Serial class
my_serial = EsploraSerial("/dev/ttyACM0", q)
#create a mock return for the queue "get"
def mockreturn(get):
yield '/abc'
# monkey patch the lifo get to return "/abc"
monkeypatch.setattr(asyncio.LifoQueue, 'get', mockreturn)
> resp = yield from q.get()
PycharmProjects/esp4s-aio/test/test_esp4sHttpServer.py:136:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
get = <LifoQueue at 0xb4b615ac maxsize=1>
def mockreturn(get):
> yield '/abc'
E RuntimeError: Task got bad yield: '/abc'
PycharmProjects/esp4s-aio/test/test_esp4sHttpServer.py:131: RuntimeError
====================== 2 failed, 1 passed in 0.03 seconds ======================
Process finished with exit code 0
Hi!
I have question regarding how asynchronous fixture teardown (e.g. closing a connection that was open asynchronously) is supposed to work.
When I have a fixture such as:
@pytest.fixture(scope='session')
@async_generator
async def fixture():
# ... async opening of a connection
connection = await asyncpg.connect(...)
await yield_(connection)
await connection.close()
I am getting the following error: ValueError: Async generator fixture didn't stop.Yield only once.
I would like to add pytest-asyncio to conda-forge and noticed during review that pytest-asyncio does not have a LICENSE file as part of the pypi distribution.
Tested with current head, f6d84f6
The following test case fails
import asyncio
import pytest
@pytest.fixture()
async def async_inner_fixture():
await asyncio.sleep(0.01)
print('inner async_fixture start')
yield True
print('inner async_fixture stop')
@pytest.fixture()
async def async_fixture(async_inner_fixture):
await asyncio.sleep(0.01)
print('async_fixture start')
yield True
assert async_inner_fixture is True
print('async_fixture stop')
@pytest.mark.asyncio
async def test_async_fixture(async_fixture):
assert async_fixture is True
print('test_async_fixture')
with the following error
F async_fixture start
test_async_fixture
tests/test_integration.py:66 (test_async_fixture)
async_inner_fixture = <async_generator object async_inner_fixture at 0x7f7db1f5d868>
@pytest.fixture()
async def async_fixture(async_inner_fixture):
await asyncio.sleep(0.01)
print('async_fixture start')
yield True
> assert async_inner_fixture is True
E assert <async_generator object async_inner_fixture at 0x7f7db1f5d868> is True
tests/test_integration.py:63: AssertionError
I.e. async_inner_fixture is not working properly when used in async_fixture.
Background
-k EXPRESSION only run tests which match the given substring
expression. An expression is a python evaluatable
expression where all names are substring-matched
against test names and their parent classes. Example:
-k 'test_method or test_other' matches all test
functions and classes whose name contains
'test_method' or 'test_other', while -k 'not
test_method' matches those that don't contain
'test_method' in their names. Additionally keywords
are matched to classes and functions containing extra
names in their 'extra_keyword_matches' set, as well as
functions which have names assigned directly to them.
Steps to reproduce
@pytest.mark.asyncio
async def test_bug():
pass
@pytest.mark.asyncio
async def test_integration():
pass
-k test_integration
Observed output:
core/test_integration.py::test_bug PASSED
core/test_integration.py::test_integration PASSED
Expected output:
core/test_integration.py::test_integration PASSED
Note that -k test_bug
works fine.
In order to be able to handle asyncio.subprocesses
with both stdout
, stderr
callbacks one need to have a child watcher that's attached to the loop, so the event_loop
needs to look something like:
def get_event_loop():
if sys.platform == 'win32':
return asyncio.ProactorEventLoop() # on windows IO needs this
return asyncio.new_event_loop() # default on UNIX is fine
@pytest.yield_fixture()
def event_loop():
"""pytest-asyncio customization"""
loop = get_event_loop()
if sys.platform != "win32":
# on UNIX we also need to attach the loop to the child watcher for asyncio.subprocess
policy = asyncio.get_event_loop_policy()
watcher = asyncio.SafeChildWatcher()
watcher.attach_loop(loop)
policy.set_child_watcher(watcher)
try:
yield loop
finally:
loop.close()
Now the problem is that pytest-asyncio finalizer tries to attach back to the old event loop: https://github.com/pytest-dev/pytest-asyncio/blob/master/pytest_asyncio/plugin.py#L139, and at that point all watchers are also attached to that loop too. However, you cannot attach to a closed loop, so if the old loop is closed the finalizer fixture will throw an exception: raise RuntimeError('Event loop is closed')
😞
points of interest:
attach_loop
for watcher - https://github.com/python/cpython/blob/master/Lib/asyncio/unix_events.py#L758
add_signal_handler
check closed - https://github.com/python/cpython/blob/master/Lib/asyncio/unix_events.py#L84
set_event_loop
for policy - https://github.com/python/cpython/blob/master/Lib/asyncio/unix_events.py#L1007
As a work around setting the event loop at the start to None
solves this, however maybe we should not attach back to closed loops, and instead just remove those loops.
having a minimal:
from pytest import mark
@mark.xfail(reason="I need a failing
def test_fail_please():
assert False
@mark.xfail(reason="I'm testing for a pytest-asyncio-bug - if I don't fail something is broken.")
@mark.asyncio
def test_fail_async():
assert False
test_fail_async
fails as expected with asyncio==0.3.0, but succeeds with asyncio==0.4.0.
(running python3.5.1)
People very often asked why pytest-asyncio
and pytest-aiohttp
are not compatible.
Well, AFAIK they are not mutually exclusive but the compatibility is not perfect still.
I had very strong objection for pytest-asyncio
behavior: from my perspective event_loop
fixture should call asyncio.set_event_loop(None)
before running a test. It's prevent forgetting to pass loop
param to calls like asyncio.sleep()
etc.
But not I withdraw my objection: since Python 3.5.3 the coroutine always gets a running event loop regardless to asyncio.set_event_loop()
param.
We still have many differences between pytest-asyncio
and pytest-aiohttp
: a need for special @pytest.mark.asyncio
markup for pytest-asyncio
and event loop naming.
In pytest-asyncio
it is event_loop
but int pytest-aiohttp
the name is loop
.
The loop
name is used everywhere in asyncio
documentation, I love to keep it (at least the name is shorter).
@Tinche would you consider adding the loop
fixture as an alias for event_loop
?
I'm not a pytest
jedi, in my mind the fixture should look like:
@pytest.yield_fixture
def loop(event_loop):
return event_loop
but maybe you have a better idea.
Love the lib!
Right now I can't use it with code that closes the event loop. Could you make it so it check if loop.is_closed(), and if it is, not attempt to close (which make the test fails)?
Got Error:
RuntimeError: There is no current event loop in thread 'MainThread'.
For example:
https://travis-ci.org/botstory/botstory/builds/236994191#L594
Sources of unit test:
https://github.com/botstory/botstory/blob/develop/botstory/chat_test.py#L38
@pytest.mark.asyncio
async def test_should_say(mock_interface):
with answer.Talk() as talk:
story = talk.story
story.use(mock_interface)
@story.on('hi there!')
def one_story():
@story.part()
async def then(ctx):
await story.say('Nice to see you!', ctx['user'])
await talk.pure_text('hi there!')
mock_interface.send_text_message.assert_called_once_with(
recipient=talk.user,
text='Nice to see you!',
options=None,
)
And the same error for many other async tests which are marked with @pytest.mark.asyncio
, with fixtures and without it.
Deps are: https://github.com/botstory/botstory/blob/develop/requirements.txt
aiohttp==2.1.0
motor==1.1
pytest==3.1.0
pytest-aiohttp==0.1.3
pytest-asyncio==0.5.0
pytest-catchlog==1.2.2
pytest-cov==2.5.1
pytest-flakes==2.0.0
pytest-mock==1.6.0
yarl==0.10.2
previous version works fine, except deprecation warning
I am using pytest_asyncio for my asyncio tests and am trying to get it to work with another fixture I wrote. They work together fine as long as I only access the loop inside the test function by calling functions on the passed in fixture. I was trying to figure out if it was possible to access the loop in the fixture function itself (or even pytest_runtest_setup).
I'm not that familiar with py.test internals, but as far as I can tell from reading examples and source code, it looks like pytest_asyncio creates the loop right before calling the test function (after *_setup and fixture functions are called) and then closes the loop right after execution. Is this right?
Specifically, what I'm trying to do is kick off an asyncio.ensure_future()
right before the test runs using the loop that the test will be running on.
Getting:
E RuntimeError: Task <Task pending coro=<pytest_fixture_setup.<locals>.wrapper.<locals>.setup() running at /home/travis/build/plone/guillotina/eggs/pytest_asyncio-0.6.0-py3.6.egg/pytest_asyncio/plugin.py:120> cb=[_run_until_complete_cb() at /opt/python/3.6-dev/lib/python3.6/asyncio/base_events.py:176]> got Future <_GatheringFuture pending> attached to a different loop
on many of my tests now.
Downgrading to 0.5.0 works. Is there something I'm doing wrong in my tests?
It would be useful when testing an application that uses a custom implementation of asyncio.AbstractEventLoop
to be able to provide a custom event loop policy and/or custom event loop via a configuration option.
It is of course already possible to set the event loop policy globally with asyncio.set_event_loop_policy
(e.g. in pytest_configure
) or to override the event_loop
fixture so that it returns a loop of the desired class, so perhaps this would be unnecessary.
Hi,
My project is Python 3.5 only, and it's therefore impossible for me to use async generators in fixtures. But we might be able to workaround this if it were possible to define async finalizers to request
.
E.g.
@pytest.fixture
async def foo_fixture(request):
async def fin():
await some_stuff()
request.addasyncfinalizer(fin)
return 42
Then it would await for each async finalizers before tear down.
Not sure whether it would involve monkey patching on the request object, or what would be acceptable. Maybe there are also more proper solutions that doesn't involve request
or that make use of already existing features? I already tried to use loop.create_task()
in a standard finalizer, unfortunately, the created task seem to execute after teardown which produces the following error message:
2017-06-26 18:15:50,262 - asyncio - ERROR - Task was destroyed but it is pending!
Many thanks!
See the aio-libs/pytest-aiohttp#8
I am not sure, if it is fault of one or the other plugin or there is a need on some naming convention not to clash one with another.
Personally I consider pytest-asyncio
more general and would expect a fix on pytest-aiohttp
side, but this is too early to tell before the reason is well described.
Hi,
I see that there is a 0.4.1 version on pypi but the code here on GitHub is still on version 0.4.0. Can you publish the commit for 0.4.1? I am asking this because I am maintaining this package for fedora and I use the source from GitHub and I would like to update it to 0.4.1.
easy_install
is not capable[1] of install pytest-asyncio
I know pip
is the tool for the job but when you use setup.py to manage requirements it uses easy_install
for it. I think the problem could be the name of wheel package that is .pytest_asyncio
with _
[1] https://packaging.python.org/en/latest/pip_easy_install.html
$ easy_install pytest-asyncio
Searching for pytest-asyncio
Reading https://pypi.python.org/simple/pytest-asyncio/
No local packages or download links found for pytest-asyncio
error: Could not find suitable distribution for Requirement.parse('pytest-asyncio')
(test)wiliam@vostro:~/.../test$ easy_install pytest_asyncio
Searching for pytest-asyncio
Reading https://pypi.python.org/simple/pytest_asyncio/
No local packages or download links found for pytest-asyncio
error: Could not find suitable distribution for Requirement.parse('pytest-asyncio')
This broke our CI (eg here, and we are considering forking pytest-asyncio.
Could s.o. explain why support for Py < 3.5 was dropped? Any particular reason.
Currently asyncio doesn't support test methods (see #4):
class Test:
@pytest.mark.asyncio
def test_some_asyncio_code(self):
res = yield from library.do_something()
assert b'expected result' == res
An upgrade to 0.4.1 broke all my tests and I’m unable to locate some kind of changelog that could give me cues what’s going wrong…could you start keeping one please? :)
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.