Giter Club home page Giter Club logo

pytest-asyncio's People

Contributors

agronholm avatar alblasco avatar asvetlov avatar bluetech avatar dependabot[bot] avatar dirn avatar gazorby avatar graingert avatar hynek avatar jonafato avatar josekilo avatar kianmeng avatar kriek avatar mgorny avatar nicoddemus avatar opoplawski avatar pipermerriam avatar robertcraigie avatar sashgorokhov avatar seifertm avatar simonfagerholm avatar smagafurov avatar sobolevn avatar tadaboody avatar timothyfitz avatar tinche avatar touillewoman avatar ujjwal123123 avatar wjsi avatar zac-hd avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

pytest-asyncio's Issues

How to specify two different ports by `unused_tcp_port` fixture

I have functional tests for my system which requires several components running on different ports
As I see unused_tcp_port has functional scope, so it's impossible to acquire two different ports (for different purposes, sure) in the same test.
Or maybe I missed something?

@mock.patch decorator doesnt work properly with @pytest.mark.asyncio

Sample test code:

tests.py

from unittest import mock

import pytest
import testmodule


@pytest.mark.asyncio
async def test_async_mock_patch_contextmanager():
    with mock.patch('testmodule.foo') as foo_mock:
        assert testmodule.foo is foo_mock


@pytest.mark.asyncio
@mock.patch('testmodule.foo')
async def test_async_mock_patch_decorator(foo_mock):
    assert testmodule.foo is foo_mock  # FAILS


def test_mock_patch_contextmanager():
    with mock.patch('testmodule.foo') as foo_mock:
        assert testmodule.foo is foo_mock


@mock.patch('testmodule.foo')
def test_mock_patch_decorator(foo_mock):
    assert testmodule.foo is foo_mock

testmodule.py

def foo():
    return 'Bar'

test_async_mock_patch_decorator will fail, because testmodule.foo is <function foo at ###>, not <MagicMock name='foo' id='###'>

Tested this on python3.6 only.

UPDATE:

Seems that on python 3.5.2 there is no such bug. All tests succeeded.

How to catch exception inside of coroutine that was created by asyncio.ensure_future

I have scenario that tests part of app with a complex schedule of concurrent tasks where I'm using asyncio.ensure_future. Here is simplified example of it:

import asyncio
import pytest

async def a_second():
    await asyncio.sleep(0.1)

@pytest.mark.asyncio
async def test_very_lazy_task(event_loop):
    async def one():
        # we are waiting for this exception which isn't never caught
        pytest.fail('Got it!')

    # param loop=event_loop doesn't influent on a result at all
    asyncio.ensure_future(one(), loop=event_loop)
    await a_second()
    # with this guy we can check that we actually have Got it error in our logs
    # pytest.fail('after all!')

Sadly it seems that function one falls out of pytest asyncio sphere of influence. And pytest.fail('Got it!') raises exception but was never caught by pytest.

with pytest.fail('after all!') I got:

============================================= FAILURES ==============================================
________________________________________ test_very_lazy_task ________________________________________

event_loop = <_UnixSelectorEventLoop running=False closed=True debug=False>

    @pytest.mark.asyncio
    async def test_very_lazy_task(event_loop):
        async def one():
            pytest.fail('Got it!')

        asyncio.ensure_future(one(), loop=event_loop)
        await a_second()
>       pytest.fail('after all!')
E       Failed: after all!

fb/conversation/test_story.py:125: Failed
--------------------------------------- Captured stderr setup ---------------------------------------
DEBUG:asyncio:Using selector: EpollSelector
---------------------------------------- Captured log setup -----------------------------------------
selector_events.py          53 DEBUG    Using selector: EpollSelector
--------------------------------------- Captured stderr call ----------------------------------------
ERROR:asyncio:Task exception was never retrieved
future: <Task finished coro=<test_very_lazy_task.<locals>.one() done, defined at /usr/src/app/fb/conversation/test_story.py:120> exception=Got it!>
Traceback (most recent call last):
  File "/usr/local/lib/python3.5/asyncio/tasks.py", line 239, in _step
    result = coro.send(None)
  File "/usr/src/app/fb/conversation/test_story.py", line 121, in one
    pytest.fail('Got it!')
  File "/usr/local/lib/python3.5/site-packages/_pytest/runner.py", line 486, in fail
    raise Failed(msg=msg, pytrace=pytrace)
Failed: Got it!
----------------------------------------- Captured log call -----------------------------------------
base_events.py            1090 ERROR    Task exception was never retrieved
future: <Task finished coro=<test_very_lazy_task.<locals>.one() done, defined at /usr/src/app/fb/conversation/test_story.py:120> exception=Got it!>
Traceback (most recent call last):
  File "/usr/local/lib/python3.5/asyncio/tasks.py", line 239, in _step
    result = coro.send(None)
  File "/usr/src/app/fb/conversation/test_story.py", line 121, in one
    pytest.fail('Got it!')
  File "/usr/local/lib/python3.5/site-packages/_pytest/runner.py", line 486, in fail
    raise Failed(msg=msg, pytrace=pytrace)
Failed: Got it!

So what it the right way to catch exception inside of coroutine functions that were launched by asyncio.ensure_future?

Unable the use with unittest.TestCase

I am trying to write a test with async code AND using the unittest.TestCase base class. When I use TestCase the test method always passes.
What I am doing wrong?

import pytest                                                                                  
import unittest
import aiohttp
import asyncio

URL = "http://www.github.com"


class TestOnUnittestClass(unittest.TestCase):

    @pytest.mark.asyncio
    def test_get(self):
        req = (yield from aiohttp.request("GET", URL))
        assert req.status == 201

class TestOnClass:

    @pytest.mark.asyncio
    def test_get(self):
        req = (yield from aiohttp.request("GET", URL))
        assert req.status == 201

@pytest.mark.asyncio
def test_async():
    req = (yield from aiohttp.request("GET", URL))
    assert req.status == 201

The output is:

$ py.test -vvvv async_on_class_test.py
===================================================================================== test session starts =====================================================================================
platform linux -- Python 3.4.0 -- py-1.4.30 -- pytest-2.7.2 -- /home/paurullan/.virtualenvs/status/bin/python3
rootdir: /home/paurullan/projects/status-qa, inifile: 
plugins: asyncio, xdist, colordots
collected 3 items 

async_on_class_test.py::TestOnUnittestClass::test_get PASSED
async_on_class_test.py::TestOnClass::test_get FAILED
async_on_class_test.py::test_async FAILED

========================================================================================== FAILURES ===========================================================================================
____________________________________________________________________________________ TestOnClass.test_get _____________________________________________________________________________________

self = <async_on_class_test.TestOnClass object at 0x7f70e6e3e898>

    @asyncio.coroutine
    @pytest.mark.asyncio
    def test_get(self):
        req = (yield from aiohttp.request("GET", URL))
>       assert req.status == 201
E       assert 200 == 201
E        +  where 200 = <ClientResponse(https://github.com/) [200 OK]>\n<CIMultiDictProxy {'SERVER': 'GitHub.com', 'DATE': 'Tue, 11 Aug 2015 15...S': 'deny', 'VARY': 'Accept-Encoding', 'X-SERVED-BY': '53e13e5d66f560f3e2b04e74a099de0d', 'CONTENT-ENCODING': 'gzip'}>\n.status

async_on_class_test.py:23: AssertionError
_________________________________________________________________________________________ test_async __________________________________________________________________________________________

    @pytest.mark.asyncio
    def test_async():
        req = (yield from aiohttp.request("GET", URL))
>       assert req.status == 201
E       assert 200 == 201
E        +  where 200 = <ClientResponse(https://github.com/) [200 OK]>\n<CIMultiDictProxy {'SERVER': 'GitHub.com', 'DATE': 'Tue, 11 Aug 2015 15...S': 'deny', 'VARY': 'Accept-Encoding', 'X-SERVED-BY': 'a128136e4734a9f74c013356c773ece7', 'CONTENT-ENCODING': 'gzip'}>\n.status

async_on_class_test.py:28: AssertionError
============================================================================= 2 failed, 1 passed in 2.43 seconds ==============================================================================

Help with simple example

I'm sure I'm doing something wrong, but I can't get tests working with pytest.mark.asyncio.

import pytest

@pytest.mark.asyncio
async def test_foo():
    assert False

Running py.test claims that test_foo was executed and passed.

Using pytest 2.9.2, pytest-asyncio 0.4.0.

unittest support: Coroutine was never awaited

I am trying to create a test case for the below async function get_data. I get Runtime warning as RuntimeWarning: coroutine 'TestAsyncStatSvc.test_async_get_data' was never awaited testfunction(**testargs)

Below is my code. Please guide on this one. AFAIK, we get this exception when there is no event loop so I created one in the fixture. What is there that I am missing?

async def get_data(self, data_id=None):
          sql = """
              SELECT id, description
              FROM data_table
          """
          sql_params = []
          if data_id:
              sql += """
              WHERE id = $1
              """
              sql_params += [data_id]
          result = await self.dbproxy_async.execute_query_async(sql, sql_params)

          if result.empty and data_id:
              raise NotFound('data %s not found.' % data_id)
          return result.to_json()

Below Is the test case:

  class TestAsyncStatSvc(object):
      
          TEST_DATA = {
                  'Key1': ['Key1', 'Data desc for key1'],
                  'Key2': ['Key2', 'Data desc for key2'],
                  None: [
                      ['Key1', 'Data desc for key1'],
                      ['Key2', 'Data desc for key2']
                  ]
              }
          
          @pytest.mark.asyncio
          async def test_async_get_data(self, data_svc_fixture):
                  for query_value in TESTDATA.keys:

                      execute_stub = MagicMock(return_value=self.TESTDATA[query_value])

                      # Wrap the stub in a coroutine (so it can be awaited)
                      execute_coro = asyncio.coroutine(execute_stub)

                      # Stub the database db_proxy
                      db_proxy = MagicMock()
                      db_proxy.execute_query_async = execute_coro
          
                      result = data_svc_fixture.get_data(data_id=query_value)            
                      assert result == self.TESTDATA[query_value]
                      
                      if query_value is not None:
                          assert len(comm) == 1
                      else:
                          assert len(comm) == 2
          
                  with pytest.raises(NotFound):
                      data_svc_fixture.get_data('Unobtained')

And here are the fixtures:

        class Context:
            def __init__(self, loop, svc_fixture):
                self.loop = loop
                self.svc_fixture = svc_fixture
        
        
        @pytest.yield_fixture(scope="function")
        def data_svc_fixture(db_proxy_fixture, get_service_fixture, my_svc_configurator_fixture, event_loop):
            ctx = Context(event_loop, get_service_fixture('MySvc'))
            yield ctx
            event_loop.run_until_complete(asyncio.gather(*asyncio.Task.all_tasks(event_loop), return_exceptions=True))
    
        @pytest.yield_fixture()
        def event_loop():
              loop = asyncio.new_event_loop()
              asyncio.set_event_loop(loop)
              yield loop
              loop.close()

Make a TCP server and a client fixture.

So you can do in a test:

def test_foo(tcp_client):
    client = await tcp_client([port=ip_factory()], [ip="localhost"])
    await my_server.start(client.port, client.ip)
    await client.send(b'some_byte')
    assert my_server.has_done_stuff()


def test_bar(tcp_server):
    server = await tcp_server([port=ip_factory()], [ip="localhost"])

    # repond b'somebytes' when the incomming data match the "on" test
    server.respond(b'somebytes', [on=lambda request: True])

    client = await my_client(server.ip, server.port) 
    assert client.send(b'some_byte') # get the expected answer 
    assert my_client.has_done_stuff()

Very useful for end to end testing.

Complete move to pytest-dev

Hi @Tinche,

I have set you up as owner of this repository, and a collaborator of @pytest-dev, so I think all is good. 😄

To complete the transition, here are a couple of things it would be nice to do:

  • Update links in code (setup.py, README, etc) and documentation;
  • Update Travis connection;
  • Update coveralls connection;

Feel free to ask for help with any of these, here or in the mailing list.

Welcome! 😄

Error 'TooManyConnectionsError'

I run many tests together, and error 'TooManyConnectionsError' occurs:

.........E........EEEE..
==================================== ERRORS ====================================
______________________ ERROR at setup of test_demo_0409_2 ______________________

args = (), kwargs = {}
loop = <_UnixSelectorEventLoop running=False closed=False debug=False>
setup = <function pytest_fixture_setup.<locals>.wrapper.<locals>.setup at 0x7f357c13f488>

    def wrapper(*args, **kwargs):
        loop = kwargs['event_loop']
        if strip_event_loop:
            del kwargs['event_loop']

        async def setup():
            res = await f(*args, **kwargs)
            return res

>       return loop.run_until_complete(setup())

/usr/local/lib/python3.6/site-packages/pytest_asyncio/plugin.py:123:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
/usr/local/lib/python3.6/asyncio/base_events.py:467: in run_until_complete
    return future.result()
/usr/local/lib/python3.6/site-packages/pytest_asyncio/plugin.py:120: in setup
    res = await f(*args, **kwargs)
../apps/conf/test/db.py:38: in pool_setup
    return await db.create_pool(conn_str)
/usr/local/lib/python3.6/site-packages/gino/pool.py:147: in _async__init__
    rv = await super()._async__init__()
/usr/local/lib/python3.6/site-packages/asyncpg/pool.py:349: in _async__init__
    await asyncio.gather(*connect_tasks, loop=self._loop)
/usr/local/lib/python3.6/site-packages/asyncpg/pool.py:138: in connect
    connection_class=self._pool._connection_class)
/usr/local/lib/python3.6/site-packages/asyncpg/connect_utils.py:274: in _connect_addr
    await asyncio.wait_for(connected, loop=loop, timeout=timeout)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

fut = <Future finished exception=TooManyConnectionsError('sorry, too many clients already',)>
timeout = 59.981232699006796

    @coroutine
    def wait_for(fut, timeout, *, loop=None):
        """Wait for the single Future or coroutine to complete, with timeout.

        Coroutine will be wrapped in Task.

        Returns result of the Future or coroutine.  When a timeout occurs,
        it cancels the task and raises TimeoutError.  To avoid the task
        cancellation, wrap it in shield().

        If the wait is cancelled, the task is also cancelled.

        This function is a coroutine.
        """
        if loop is None:
            loop = events.get_event_loop()

        if timeout is None:
            return (yield from fut)

        waiter = loop.create_future()
        timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
        cb = functools.partial(_release_waiter, waiter)

        fut = ensure_future(fut, loop=loop)
        fut.add_done_callback(cb)

        try:
            # wait until the future completes or the timeout
            try:
                yield from waiter
            except futures.CancelledError:
                fut.remove_done_callback(cb)
                fut.cancel()
                raise

            if fut.done():
>               return fut.result()
E               asyncpg.exceptions.TooManyConnectionsError: sorry, too many clients already

/usr/local/lib/python3.6/asyncio/tasks.py:358: TooManyConnectionsError

Add Python 3.6 support

I opened #43 for this but it looks like it's a bit more involved due to some tests no longer passing.

[problem]: some test cases get hung up during execution

problem:

Some of my test cases get hung up during execution and similar cases are fine, and the problem can be reproducted. The case have problem can run smoothly if i steply execute it with pycharm. I have no idea what caused that problem, so i ctrl+c it and run with --fulltrace then. Is it possible that there are some kind of problems of my code? Maybe i use asyncio in wrong way? The detail is pasted below, hope that will be helpful.

detail

!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! KeyboardInterrupt !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!

config = <_pytest.config.Config object at 0x7ffaeb5bbef0>, doit = <function _main at 0x7ffaeb5197b8>

    def wrap_session(config, doit):
        """Skeleton command line program"""
        session = Session(config)
        session.exitstatus = EXIT_OK
        initstate = 0
        try:
            try:
                config._do_configure()
                initstate = 1
                config.hook.pytest_sessionstart(session=session)
                initstate = 2
>               session.exitstatus = doit(config, session) or 0

env/local/lib/python3.5/site-packages/_pytest/main.py:98: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

config = <_pytest.config.Config object at 0x7ffaeb5bbef0>, session = <Session 'aredis'>

    def _main(config, session):
        """ default command line protocol for initialization, session,
        running tests and reporting. """
        config.hook.pytest_collection(session=session)
>       config.hook.pytest_runtestloop(session=session)

env/local/lib/python3.5/site-packages/_pytest/main.py:133: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_HookCaller 'pytest_runtestloop'>
kwargs = {'__multicall__': <_MultiCall 0 results, 0 meths, kwargs={'session': <Session 'aredis'>, '__multicall__': <_MultiCall 0 results, 0 meths, kwargs={...}>}>, 'session': <Session 'aredis'>}

    def __call__(self, **kwargs):
        assert not self.is_historic()
>       return self._hookexec(self, self._nonwrappers + self._wrappers, kwargs)

env/local/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py:745: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_pytest.config.PytestPluginManager object at 0x7ffaece772b0>, hook = <_HookCaller 'pytest_runtestloop'>, methods = []
kwargs = {'__multicall__': <_MultiCall 0 results, 0 meths, kwargs={'session': <Session 'aredis'>, '__multicall__': <_MultiCall 0 results, 0 meths, kwargs={...}>}>, 'session': <Session 'aredis'>}

    def _hookexec(self, hook, methods, kwargs):
        # called from all hookcaller instances.
        # enable_tracing will set its own wrapping function at self._inner_hookexec
>       return self._inner_hookexec(hook, methods, kwargs)

env/local/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py:339: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

hook = <_HookCaller 'pytest_runtestloop'>, methods = []
kwargs = {'__multicall__': <_MultiCall 0 results, 0 meths, kwargs={'session': <Session 'aredis'>, '__multicall__': <_MultiCall 0 results, 0 meths, kwargs={...}>}>, 'session': <Session 'aredis'>}

    self._inner_hookexec = lambda hook, methods, kwargs: \
>       _MultiCall(methods, kwargs, hook.spec_opts).execute()

env/local/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py:334: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_MultiCall 0 results, 0 meths, kwargs={'session': <Session 'aredis'>, '__multicall__': <_MultiCall 0 results, 0 meths, kwargs={...}>}>

    def execute(self):
        all_kwargs = self.kwargs
        self.results = results = []
        firstresult = self.specopts.get("firstresult")
    
        while self.hook_impls:
            hook_impl = self.hook_impls.pop()
            try:
                args = [all_kwargs[argname] for argname in hook_impl.argnames]
            except KeyError:
                for argname in hook_impl.argnames:
                    if argname not in all_kwargs:
                        raise HookCallError(
                            "hook call must provide argument %r" % (argname,))
            if hook_impl.hookwrapper:
                return _wrapped_call(hook_impl.function(*args), self.execute)
>           res = hook_impl.function(*args)

env/local/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py:614: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

session = <Session 'aredis'>

    def pytest_runtestloop(session):
        if (session.testsfailed and
                not session.config.option.continue_on_collection_errors):
            raise session.Interrupted(
                "%d errors during collection" % session.testsfailed)
    
        if session.config.option.collectonly:
            return True
    
        for i, item in enumerate(session.items):
            nextitem = session.items[i+1] if i+1 < len(session.items) else None
>           item.config.hook.pytest_runtest_protocol(item=item, nextitem=nextitem)

env/local/lib/python3.5/site-packages/_pytest/main.py:154: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_HookCaller 'pytest_runtest_protocol'>
kwargs = {'__multicall__': <_MultiCall 0 results, 0 meths, kwargs={'nextitem': None, '__multicall__': <_MultiCall 0 results, 0 ...tem': <Function 'test_channel_message_handler'>}>, 'item': <Function 'test_channel_message_handler'>, 'nextitem': None}

    def __call__(self, **kwargs):
        assert not self.is_historic()
>       return self._hookexec(self, self._nonwrappers + self._wrappers, kwargs)

env/local/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py:745: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_pytest.config.PytestPluginManager object at 0x7ffaece772b0>, hook = <_HookCaller 'pytest_runtest_protocol'>, methods = []
kwargs = {'__multicall__': <_MultiCall 0 results, 0 meths, kwargs={'nextitem': None, '__multicall__': <_MultiCall 0 results, 0 ...tem': <Function 'test_channel_message_handler'>}>, 'item': <Function 'test_channel_message_handler'>, 'nextitem': None}

    def _hookexec(self, hook, methods, kwargs):
        # called from all hookcaller instances.
        # enable_tracing will set its own wrapping function at self._inner_hookexec
>       return self._inner_hookexec(hook, methods, kwargs)

env/local/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py:339: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

hook = <_HookCaller 'pytest_runtest_protocol'>, methods = []
kwargs = {'__multicall__': <_MultiCall 0 results, 0 meths, kwargs={'nextitem': None, '__multicall__': <_MultiCall 0 results, 0 ...tem': <Function 'test_channel_message_handler'>}>, 'item': <Function 'test_channel_message_handler'>, 'nextitem': None}

    self._inner_hookexec = lambda hook, methods, kwargs: \
>       _MultiCall(methods, kwargs, hook.spec_opts).execute()

env/local/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py:334: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_MultiCall 0 results, 0 meths, kwargs={'nextitem': None, '__multicall__': <_MultiCall 0 results, 0 meths, kwargs={...}>, 'item': <Function 'test_channel_message_handler'>}>

    def execute(self):
        all_kwargs = self.kwargs
        self.results = results = []
        firstresult = self.specopts.get("firstresult")
    
        while self.hook_impls:
            hook_impl = self.hook_impls.pop()
            try:
                args = [all_kwargs[argname] for argname in hook_impl.argnames]
            except KeyError:
                for argname in hook_impl.argnames:
                    if argname not in all_kwargs:
                        raise HookCallError(
                            "hook call must provide argument %r" % (argname,))
            if hook_impl.hookwrapper:
>               return _wrapped_call(hook_impl.function(*args), self.execute)

env/local/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py:613: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

wrap_controller = <generator object pytest_runtest_protocol at 0x7ffae90384c0>
func = <bound method _MultiCall.execute of <_MultiCall 0 results, 0 meths, kwargs={'nextitem': None, '__multicall__': <_MultiCall 0 results, 0 meths, kwargs={...}>, 'item': <Function 'test_channel_message_handler'>}>>

    def _wrapped_call(wrap_controller, func):
        """ Wrap calling to a function with a generator which needs to yield
        exactly once.  The yield point will trigger calling the wrapped function
        and return its _CallOutcome to the yield point.  The generator then needs
        to finish (raise StopIteration) in order for the wrapped call to complete.
        """
        try:
            next(wrap_controller)   # first yield
        except StopIteration:
            _raise_wrapfail(wrap_controller, "did not yield")
        call_outcome = _CallOutcome(func)
        try:
            wrap_controller.send(call_outcome)
            _raise_wrapfail(wrap_controller, "has second yield")
        except StopIteration:
            pass
>       return call_outcome.get_result()

env/local/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py:254: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_pytest.vendored_packages.pluggy._CallOutcome object at 0x7ffae8dcddd8>

    def get_result(self):
        if self.excinfo is None:
            return self.result
        else:
            ex = self.excinfo
            if _py3:
>               raise ex[1].with_traceback(ex[2])

env/local/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py:279: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_pytest.vendored_packages.pluggy._CallOutcome object at 0x7ffae8dcddd8>
func = <bound method _MultiCall.execute of <_MultiCall 0 results, 0 meths, kwargs={'nextitem': None, '__multicall__': <_MultiCall 0 results, 0 meths, kwargs={...}>, 'item': <Function 'test_channel_message_handler'>}>>

    def __init__(self, func):
        try:
>           self.result = func()

env/local/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py:265: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_MultiCall 0 results, 0 meths, kwargs={'nextitem': None, '__multicall__': <_MultiCall 0 results, 0 meths, kwargs={...}>, 'item': <Function 'test_channel_message_handler'>}>

    def execute(self):
        all_kwargs = self.kwargs
        self.results = results = []
        firstresult = self.specopts.get("firstresult")
    
        while self.hook_impls:
            hook_impl = self.hook_impls.pop()
            try:
                args = [all_kwargs[argname] for argname in hook_impl.argnames]
            except KeyError:
                for argname in hook_impl.argnames:
                    if argname not in all_kwargs:
                        raise HookCallError(
                            "hook call must provide argument %r" % (argname,))
            if hook_impl.hookwrapper:
                return _wrapped_call(hook_impl.function(*args), self.execute)
>           res = hook_impl.function(*args)

env/local/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py:614: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

item = <Function 'test_channel_message_handler'>, nextitem = None

    def pytest_runtest_protocol(item, nextitem):
        item.ihook.pytest_runtest_logstart(
            nodeid=item.nodeid, location=item.location,
        )
>       runtestprotocol(item, nextitem=nextitem)

env/local/lib/python3.5/site-packages/_pytest/runner.py:66: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

item = <Function 'test_channel_message_handler'>, log = True, nextitem = None

    def runtestprotocol(item, log=True, nextitem=None):
        hasrequest = hasattr(item, "_request")
        if hasrequest and not item._request:
            item._initrequest()
        rep = call_and_report(item, "setup", log)
        reports = [rep]
        if rep.passed:
            if item.config.option.setupshow:
                show_test_item(item)
            if not item.config.option.setuponly:
>               reports.append(call_and_report(item, "call", log))

env/local/lib/python3.5/site-packages/_pytest/runner.py:79: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

item = <Function 'test_channel_message_handler'>, when = 'call', log = True, kwds = {}

    def call_and_report(item, when, log=True, **kwds):
>       call = call_runtest_hook(item, when, **kwds)

env/local/lib/python3.5/site-packages/_pytest/runner.py:133: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

item = <Function 'test_channel_message_handler'>, when = 'call', kwds = {}, hookname = 'pytest_runtest_call'

    def call_runtest_hook(item, when, **kwds):
        hookname = "pytest_runtest_" + when
        ihook = getattr(item.ihook, hookname)
>       return CallInfo(lambda: ihook(item=item, **kwds), when=when)

env/local/lib/python3.5/site-packages/_pytest/runner.py:151: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <[AttributeError("'CallInfo' object has no attribute 'result'") raised in repr()] CallInfo object at 0x7ffae8dd4128>
func = <function call_runtest_hook.<locals>.<lambda> at 0x7ffae905dae8>, when = 'call'

    def __init__(self, func, when):
        #: context of invocation: one of "setup", "call",
        #: "teardown", "memocollect"
        self.when = when
        self.start = time()
        try:
>           self.result = func()

env/local/lib/python3.5/site-packages/_pytest/runner.py:163: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

>   return CallInfo(lambda: ihook(item=item, **kwds), when=when)

env/local/lib/python3.5/site-packages/_pytest/runner.py:151: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_HookCaller 'pytest_runtest_call'>
kwargs = {'__multicall__': <_MultiCall 0 results, 0 meths, kwargs={'__multicall__': <_MultiCall 0 results, 0 meths, kwargs={...}>, 'item': <Function 'test_channel_message_handler'>}>, 'item': <Function 'test_channel_message_handler'>}

    def __call__(self, **kwargs):
        assert not self.is_historic()
>       return self._hookexec(self, self._nonwrappers + self._wrappers, kwargs)

env/local/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py:745: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_pytest.config.PytestPluginManager object at 0x7ffaece772b0>, hook = <_HookCaller 'pytest_runtest_call'>, methods = []
kwargs = {'__multicall__': <_MultiCall 0 results, 0 meths, kwargs={'__multicall__': <_MultiCall 0 results, 0 meths, kwargs={...}>, 'item': <Function 'test_channel_message_handler'>}>, 'item': <Function 'test_channel_message_handler'>}

    def _hookexec(self, hook, methods, kwargs):
        # called from all hookcaller instances.
        # enable_tracing will set its own wrapping function at self._inner_hookexec
>       return self._inner_hookexec(hook, methods, kwargs)

env/local/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py:339: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

hook = <_HookCaller 'pytest_runtest_call'>, methods = []
kwargs = {'__multicall__': <_MultiCall 0 results, 0 meths, kwargs={'__multicall__': <_MultiCall 0 results, 0 meths, kwargs={...}>, 'item': <Function 'test_channel_message_handler'>}>, 'item': <Function 'test_channel_message_handler'>}

    self._inner_hookexec = lambda hook, methods, kwargs: \
>       _MultiCall(methods, kwargs, hook.spec_opts).execute()

env/local/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py:334: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_MultiCall 0 results, 0 meths, kwargs={'__multicall__': <_MultiCall 0 results, 0 meths, kwargs={...}>, 'item': <Function 'test_channel_message_handler'>}>

    def execute(self):
        all_kwargs = self.kwargs
        self.results = results = []
        firstresult = self.specopts.get("firstresult")
    
        while self.hook_impls:
            hook_impl = self.hook_impls.pop()
            try:
                args = [all_kwargs[argname] for argname in hook_impl.argnames]
            except KeyError:
                for argname in hook_impl.argnames:
                    if argname not in all_kwargs:
                        raise HookCallError(
                            "hook call must provide argument %r" % (argname,))
            if hook_impl.hookwrapper:
>               return _wrapped_call(hook_impl.function(*args), self.execute)

env/local/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py:613: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

wrap_controller = <generator object pytest_runtest_call at 0x7ffaea4b4f10>
func = <bound method _MultiCall.execute of <_MultiCall 0 results, 0 meths, kwargs={'__multicall__': <_MultiCall 0 results, 0 meths, kwargs={...}>, 'item': <Function 'test_channel_message_handler'>}>>

    def _wrapped_call(wrap_controller, func):
        """ Wrap calling to a function with a generator which needs to yield
        exactly once.  The yield point will trigger calling the wrapped function
        and return its _CallOutcome to the yield point.  The generator then needs
        to finish (raise StopIteration) in order for the wrapped call to complete.
        """
        try:
            next(wrap_controller)   # first yield
        except StopIteration:
            _raise_wrapfail(wrap_controller, "did not yield")
        call_outcome = _CallOutcome(func)
        try:
            wrap_controller.send(call_outcome)
            _raise_wrapfail(wrap_controller, "has second yield")
        except StopIteration:
            pass
>       return call_outcome.get_result()

env/local/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py:254: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_pytest.vendored_packages.pluggy._CallOutcome object at 0x7ffae8dd4240>

    def get_result(self):
        if self.excinfo is None:
            return self.result
        else:
            ex = self.excinfo
            if _py3:
>               raise ex[1].with_traceback(ex[2])

env/local/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py:279: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_pytest.vendored_packages.pluggy._CallOutcome object at 0x7ffae8dd4240>
func = <bound method _MultiCall.execute of <_MultiCall 0 results, 0 meths, kwargs={'__multicall__': <_MultiCall 0 results, 0 meths, kwargs={...}>, 'item': <Function 'test_channel_message_handler'>}>>

    def __init__(self, func):
        try:
>           self.result = func()

env/local/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py:265: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_MultiCall 0 results, 0 meths, kwargs={'__multicall__': <_MultiCall 0 results, 0 meths, kwargs={...}>, 'item': <Function 'test_channel_message_handler'>}>

    def execute(self):
        all_kwargs = self.kwargs
        self.results = results = []
        firstresult = self.specopts.get("firstresult")
    
        while self.hook_impls:
            hook_impl = self.hook_impls.pop()
            try:
                args = [all_kwargs[argname] for argname in hook_impl.argnames]
            except KeyError:
                for argname in hook_impl.argnames:
                    if argname not in all_kwargs:
                        raise HookCallError(
                            "hook call must provide argument %r" % (argname,))
            if hook_impl.hookwrapper:
                return _wrapped_call(hook_impl.function(*args), self.execute)
>           res = hook_impl.function(*args)

env/local/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py:614: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

item = <Function 'test_channel_message_handler'>

    def pytest_runtest_call(item):
        try:
>           item.runtest()

env/local/lib/python3.5/site-packages/_pytest/runner.py:104: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <Function 'test_channel_message_handler'>

    def runtest(self):
        """ execute the underlying test function. """
>       self.ihook.pytest_pyfunc_call(pyfuncitem=self)

env/local/lib/python3.5/site-packages/_pytest/python.py:1574: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_HookCaller 'pytest_pyfunc_call'>
kwargs = {'__multicall__': <_MultiCall 0 results, 1 meths, kwargs={'pyfuncitem': <Function 'test_channel_message_handler'>, '__multicall__': <_MultiCall 0 results, 1 meths, kwargs={...}>}>, 'pyfuncitem': <Function 'test_channel_message_handler'>}

    def __call__(self, **kwargs):
        assert not self.is_historic()
>       return self._hookexec(self, self._nonwrappers + self._wrappers, kwargs)

env/local/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py:745: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_pytest.config.PytestPluginManager object at 0x7ffaece772b0>, hook = <_HookCaller 'pytest_pyfunc_call'>
methods = [<_pytest.vendored_packages.pluggy.HookImpl object at 0x7ffaeb4b8e10>]
kwargs = {'__multicall__': <_MultiCall 0 results, 1 meths, kwargs={'pyfuncitem': <Function 'test_channel_message_handler'>, '__multicall__': <_MultiCall 0 results, 1 meths, kwargs={...}>}>, 'pyfuncitem': <Function 'test_channel_message_handler'>}

    def _hookexec(self, hook, methods, kwargs):
        # called from all hookcaller instances.
        # enable_tracing will set its own wrapping function at self._inner_hookexec
>       return self._inner_hookexec(hook, methods, kwargs)

env/local/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py:339: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

hook = <_HookCaller 'pytest_pyfunc_call'>, methods = [<_pytest.vendored_packages.pluggy.HookImpl object at 0x7ffaeb4b8e10>]
kwargs = {'__multicall__': <_MultiCall 0 results, 1 meths, kwargs={'pyfuncitem': <Function 'test_channel_message_handler'>, '__multicall__': <_MultiCall 0 results, 1 meths, kwargs={...}>}>, 'pyfuncitem': <Function 'test_channel_message_handler'>}

    self._inner_hookexec = lambda hook, methods, kwargs: \
>       _MultiCall(methods, kwargs, hook.spec_opts).execute()

env/local/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py:334: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_MultiCall 0 results, 1 meths, kwargs={'pyfuncitem': <Function 'test_channel_message_handler'>, '__multicall__': <_MultiCall 0 results, 1 meths, kwargs={...}>}>

    def execute(self):
        all_kwargs = self.kwargs
        self.results = results = []
        firstresult = self.specopts.get("firstresult")
    
        while self.hook_impls:
            hook_impl = self.hook_impls.pop()
            try:
                args = [all_kwargs[argname] for argname in hook_impl.argnames]
            except KeyError:
                for argname in hook_impl.argnames:
                    if argname not in all_kwargs:
                        raise HookCallError(
                            "hook call must provide argument %r" % (argname,))
            if hook_impl.hookwrapper:
>               return _wrapped_call(hook_impl.function(*args), self.execute)

env/local/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py:613: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

wrap_controller = <generator object pytest_pyfunc_call at 0x7ffae9046e60>
func = <bound method _MultiCall.execute of <_MultiCall 0 results, 1 meths, kwargs={'pyfuncitem': <Function 'test_channel_message_handler'>, '__multicall__': <_MultiCall 0 results, 1 meths, kwargs={...}>}>>

    def _wrapped_call(wrap_controller, func):
        """ Wrap calling to a function with a generator which needs to yield
        exactly once.  The yield point will trigger calling the wrapped function
        and return its _CallOutcome to the yield point.  The generator then needs
        to finish (raise StopIteration) in order for the wrapped call to complete.
        """
        try:
            next(wrap_controller)   # first yield
        except StopIteration:
            _raise_wrapfail(wrap_controller, "did not yield")
        call_outcome = _CallOutcome(func)
        try:
            wrap_controller.send(call_outcome)
            _raise_wrapfail(wrap_controller, "has second yield")
        except StopIteration:
            pass
>       return call_outcome.get_result()

env/local/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py:254: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_pytest.vendored_packages.pluggy._CallOutcome object at 0x7ffae8dd42b0>

    def get_result(self):
        if self.excinfo is None:
            return self.result
        else:
            ex = self.excinfo
            if _py3:
>               raise ex[1].with_traceback(ex[2])

env/local/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py:279: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_pytest.vendored_packages.pluggy._CallOutcome object at 0x7ffae8dd42b0>
func = <bound method _MultiCall.execute of <_MultiCall 0 results, 1 meths, kwargs={'pyfuncitem': <Function 'test_channel_message_handler'>, '__multicall__': <_MultiCall 0 results, 1 meths, kwargs={...}>}>>

    def __init__(self, func):
        try:
>           self.result = func()

env/local/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py:265: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_MultiCall 0 results, 1 meths, kwargs={'pyfuncitem': <Function 'test_channel_message_handler'>, '__multicall__': <_MultiCall 0 results, 1 meths, kwargs={...}>}>

    def execute(self):
        all_kwargs = self.kwargs
        self.results = results = []
        firstresult = self.specopts.get("firstresult")
    
        while self.hook_impls:
            hook_impl = self.hook_impls.pop()
            try:
                args = [all_kwargs[argname] for argname in hook_impl.argnames]
            except KeyError:
                for argname in hook_impl.argnames:
                    if argname not in all_kwargs:
                        raise HookCallError(
                            "hook call must provide argument %r" % (argname,))
            if hook_impl.hookwrapper:
                return _wrapped_call(hook_impl.function(*args), self.execute)
>           res = hook_impl.function(*args)

env/local/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py:614: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

pyfuncitem = <Function 'test_channel_message_handler'>

    @pytest.mark.tryfirst
    def pytest_pyfunc_call(pyfuncitem):
        """
        Run asyncio marked test functions in an event loop instead of a normal
        function call.
        """
        for marker_name, fixture_name in _markers_2_fixtures.items():
            if marker_name in pyfuncitem.keywords:
                event_loop = pyfuncitem.funcargs[fixture_name]
    
                funcargs = pyfuncitem.funcargs
                testargs = {arg: funcargs[arg]
                            for arg in pyfuncitem._fixtureinfo.argnames}
                event_loop.run_until_complete(
>                   asyncio.async(pyfuncitem.obj(**testargs), loop=event_loop))

env/local/lib/python3.5/site-packages/pytest_asyncio/plugin.py:77: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_UnixSelectorEventLoop running=False closed=False debug=False>
future = <Task pending coro=<test_channel_message_handler() running at /home/cm/work/aredis/tests/test_pubsub.py:277> wait_for=<Future pending cb=[Task._wakeup()]> cb=[_run_until_complete_cb() at /usr/lib/python3.5/asyncio/base_events.py:164]>

    def run_until_complete(self, future):
        """Run until the Future is done.
    
            If the argument is a coroutine, it is wrapped in a Task.
    
            WARNING: It would be disastrous to call run_until_complete()
            with the same coroutine twice -- it would wrap it in two
            different Tasks and that can't be good.
    
            Return the Future's result, or raise its exception.
            """
        self._check_closed()
    
        new_task = not isinstance(future, futures.Future)
        future = tasks.ensure_future(future, loop=self)
        if new_task:
            # An exception is raised if the future didn't complete, so there
            # is no need to log the "destroy pending task" message
            future._log_destroy_pending = False
    
        future.add_done_callback(_run_until_complete_cb)
        try:
>           self.run_forever()

/usr/lib/python3.5/asyncio/base_events.py:375: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_UnixSelectorEventLoop running=False closed=False debug=False>

    def run_forever(self):
        """Run until stop() is called."""
        self._check_closed()
        if self.is_running():
            raise RuntimeError('Event loop is running.')
        self._set_coroutine_wrapper(self._debug)
        self._thread_id = threading.get_ident()
        try:
            while True:
>               self._run_once()

/usr/lib/python3.5/asyncio/base_events.py:345: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_UnixSelectorEventLoop running=False closed=False debug=False>

    def _run_once(self):
        """Run one full iteration of the event loop.
    
            This calls all currently ready callbacks, polls for I/O,
            schedules the resulting callbacks, and finally schedules
            'call_later' callbacks.
            """
    
        sched_count = len(self._scheduled)
        if (sched_count > _MIN_SCHEDULED_TIMER_HANDLES and
            self._timer_cancelled_count / sched_count >
                _MIN_CANCELLED_TIMER_HANDLES_FRACTION):
            # Remove delayed calls that were cancelled if their number
            # is too high
            new_scheduled = []
            for handle in self._scheduled:
                if handle._cancelled:
                    handle._scheduled = False
                else:
                    new_scheduled.append(handle)
    
            heapq.heapify(new_scheduled)
            self._scheduled = new_scheduled
            self._timer_cancelled_count = 0
        else:
            # Remove delayed calls that were cancelled from head of queue.
            while self._scheduled and self._scheduled[0]._cancelled:
                self._timer_cancelled_count -= 1
                handle = heapq.heappop(self._scheduled)
                handle._scheduled = False
    
        timeout = None
        if self._ready or self._stopping:
            timeout = 0
        elif self._scheduled:
            # Compute the desired timeout.
            when = self._scheduled[0]._when
            timeout = max(0, when - self.time())
    
        if self._debug and timeout != 0:
            t0 = self.time()
            event_list = self._selector.select(timeout)
            dt = self.time() - t0
            if dt >= 1.0:
                level = logging.INFO
            else:
                level = logging.DEBUG
            nevent = len(event_list)
            if timeout is None:
                logger.log(level, 'poll took %.3f ms: %s events',
                           dt * 1e3, nevent)
            elif nevent:
                logger.log(level,
                           'poll %.3f ms took %.3f ms: %s events',
                           timeout * 1e3, dt * 1e3, nevent)
            elif dt >= 1.0:
                logger.log(level,
                           'poll %.3f ms took %.3f ms: timeout',
                           timeout * 1e3, dt * 1e3)
        else:
>           event_list = self._selector.select(timeout)

/usr/lib/python3.5/asyncio/base_events.py:1276: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <selectors.EpollSelector object at 0x7ffae8dd41d0>, timeout = -1

    def select(self, timeout=None):
        if timeout is None:
            timeout = -1
        elif timeout <= 0:
            timeout = 0
        else:
            # epoll_wait() has a resolution of 1 millisecond, round away
            # from zero to wait *at least* timeout seconds.
            timeout = math.ceil(timeout * 1e3) * 1e-3
    
        # epoll_wait() expects `maxevents` to be greater than zero;
        # we want to make sure that `select()` can be called when no
        # FD is registered.
        max_ev = max(len(self._fd_to_key), 1)
    
        ready = []
        try:
>           fd_event_list = self._epoll.poll(timeout, max_ev)
E           KeyboardInterrupt

/usr/lib/python3.5/selectors.py:441: KeyboardInterrupt

Failing Tests pass if fixtures use event loop

This came up when I was writing tests for some code that used asyncpg.

Essentially, if a pytest fixture uses the @pytest.mark.asyncio decorator or just directly calls the event loop from inside a synchronous function, the async tests which require that fixture will pass regardless of their content.

I've put together a minimal example here https://github.com/cshenton/pytest-asyncio-bug that only uses the standard library and pytest.

There are async tests, each of which awaits asyncio.sleep() then calls assert False. They all depend on a fixture which also calls asyncio.sleep(), in different ways, and they all pass on my machine.

DeprecationWarning: asyncio.async() function is deprecated, use ensure_future()

I am getting a ton of these warnings during test runs:

.tox/py36/lib/python3.6/site-packages/pytest_asyncio/plugin.py:77: DeprecationWarning: asyncio.async() function is deprecated, use ensure_future()
    asyncio.async(pyfuncitem.obj(**testargs), loop=event_loop))

Environment:

  • Python 3.6.1
  • pytest 3.1.0
  • pytest-asyncio 0.5.0

Need some guidance

I am having problems testing a coroutine. The coroutine is called within a asyncio.gather call and is a method within a class. The code is functional and runs as expected, but I can't seem to figure out how to create a working test case using pytest-asyncio.

Here is the coroutine code:

    @asyncio.coroutine
    def normalize_button_value(self, data, position):
        """
        Button data is returned from the Esplora in negative logic format.
        This method corrects this so that a button press is a "1" and a release is "0"
        :param data: A list of sensor data elements
        :param position: Position in the list for this button
        :return: Corrected button data
        """
        if data[position] == '0':
            return '1'
        else:
            return '0'

My test code is a method in a test class that inherits from TestCase:

    @pytest.mark.asyncio
    def test_normalize_button_value(self, event_loop):
        my_serial = EsploraSerial("/dev/ttyACM0", self.q, self.cond)
        resp = event_loop.run_until_complete(my_serial.normalize_button_value(self.list_test_data, 1))
        self.assertEquals(resp, '0')

The test fails and here is the output from the test:

self = <unittest.case._Outcome object at 0xb4b883ec>
test_case = <test.test_esploraSerial.TestEsploraSerial testMethod=test_normalize_button_value>
isTest = True

    @contextlib.contextmanager
    def testPartExecutor(self, test_case, isTest=False):
        old_success = self.success
        self.success = True
        try:
>           yield

/usr/local/lib/python3.4/unittest/case.py:58: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <test.test_esploraSerial.TestEsploraSerial testMethod=test_normalize_button_value>
result = <TestCaseFunction 'test_normalize_button_value'>

    def run(self, result=None):
        orig_result = result
        if result is None:
            result = self.defaultTestResult()
            startTestRun = getattr(result, 'startTestRun', None)
            if startTestRun is not None:
                startTestRun()

        result.startTest(self)

        testMethod = getattr(self, self._testMethodName)
        if (getattr(self.__class__, "__unittest_skip__", False) or
            getattr(testMethod, "__unittest_skip__", False)):
            # If the class or method was skipped.
            try:
                skip_why = (getattr(self.__class__, '__unittest_skip_why__', '')
                            or getattr(testMethod, '__unittest_skip_why__', ''))
                self._addSkip(result, self, skip_why)
            finally:
                result.stopTest(self)
            return
        expecting_failure = getattr(testMethod,
                                    "__unittest_expecting_failure__", False)
        outcome = _Outcome(result)
        try:
            self._outcome = outcome

            with outcome.testPartExecutor(self):
                self.setUp()
            if outcome.success:
                outcome.expecting_failure = expecting_failure
                with outcome.testPartExecutor(self, isTest=True):
>                   testMethod()
E                   TypeError: test_normalize_button_value() missing 1 required positional argument: 'event_loop'

Could you please provide some guidance as to what I may be doing wrong?

Thanks

ScopeMismatch error when adding fixture as parameter

I got a ScopeMismatch error when tried to add fixture to my test fucntion in a class when I add the scope parameter (scope='class' or scope='module'):
My case:

import asyncio
import pytest
import pytest_asyncio
from .database import DB

@pytest.fixture(scope='class')
async def db_setup(request):
    print("\nconnect to db")
    db = await DB.create()
    async def resource_teardown():
        await db.close()
        print("\ndisconnect")
    request.addfinalizer(resource_teardown)
    return db


class TestDB:
    @pytest.mark.asyncio
    async def test_connection(self, event_loop, db_setup):
        db = await db_setup
        with await db._pool as redis:
            res = await redis.ping()
            print(res)
            assert res, "PONG"

And when i run it i get :

 ScopeMismatch: You tried to access the 'function' scoped fixture 'event_loop' with a 'class' scoped request object, involved factories
../../../../../.virtualenvs/jwt_auth/lib/python3.5/site-packages/pytest_asyncio/plugin.py:110:  def wrapper(*args, **kwargs)


==================================== ERRORS ====================================
___________________ ERROR at setup of TestDB.test_connection ___________________
ScopeMismatch: You tried to access the 'function' scoped fixture 'event_loop' with a 'class' scoped request object, involved factories
../../../../../.virtualenvs/jwt_auth/lib/python3.5/site-packages/pytest_asyncio/plugin.py:110:  def wrapper(*args, **kwargs)
============================== 5 tests deselected ==============================
==================== 5 deselected, 1 error in 0.17 seconds =====================

Process finished with exit code 0

I tried to delete the event_loop param but it gave the same error but when i removed scope parameter from the fixture everithing works

Automatically mark async test functions

I have a test suite that uses pytest-asyncio for some of its tests and it works pretty well. Thank you for creating such a useful plugin.

Enhancement Request

All of my coroutine test functions are declared with async. To avoid marking every such function with @pytest.mark.asyncio, in the root conftest.py for that suite I have added the following pytest hook:

import pytest
import inspect

def pytest_collection_modifyitems(session, config, items):
    for item in items:
        if isinstance(item, pytest.Function) and inspect.iscoroutinefunction(item.function):
            item.add_marker(pytest.mark.asyncio)

So far I have not managed to find any drawback to doing this sort of thing - the async keyword indicates coroutine functions just as clearly as the decorator does, and this approach seems to correctly mark each async test.

Could something like this be a part of pytest-asyncio itself, or is there a rationale for not including such a feature?

I understand that this project existed before Python had async/await semantics; however, you appear to have dropped support for Python < 3.5 (#57), which means that every supported Python version also supports the async keyword. Furthermore, I am not suggesting the removal or deprecation of the decorator approach since there are probably valid use cases for creating coroutines without using async.

If this kind of feature would be appreciated, I can contribute the changes myself when I can find the time.

Need to call `asyncio.set_event_loop()` in my tests

Hi there!

I'm porting some of my tests from v0.3.0 to v0.5.0 and I have a test that behaves strangely when I switch to v0.4.1 or later.

After some debugging, I noticed that id(event_loop) inside the test and id(asyncio.get_event_loop()) return different results for versions after v0.4.1 and v0.5.0.

After adding a call to asyncio.set_event_loop(event_loop) in my tests like this:

def test_serve(event_loop, cli):
    asyncio.set_event_loop(event_loop)  # wasn't required with v0.3.0.

Notice that I'm not using @pytest.mark.asyncio here. I can't use it because I'm testing a click CLI that calls event_loop.run_until_complete() internally.

I guess this is explained by the fact that the event_loop fixture stopped calling asyncio.set_event_loop() when the @pytest.mark.asyncio(forbid_global_event_loop=...) syntax was introduced in v0.4.0.

In short, it seems like using only the event_loop fixture (e.g. without @pytest.mark.asyncio) leads to a surprising result. I thing that the (absence of a) relationship between the event_loop fixture and asyncio.get_event_loop() should be added to the documentation to make it more obvious. If you're OK with this, I can send in a PR to fix it.

In passing, I think there's a big quirk in the API here: the behaviour for using the event_loop fixture without the asyncio marker is surprising, to say to least. The havior in v0.3.0 was much better than it is now in that respect. This seems to be caused by the fact that forbid_global_event_loop is marker parameter rather than a fixture. If the policy for controlling the relationship with the default event loop were a fixture, it would allow the event_loop fixture to decide what to do with the default event loop.

I've seen @asvetlov's comments in issue #12 and PR #24. There seems to be some contention over whether returning exceptions from asyncio.get_event_loop() should be the default or not. I don't have a specific position on this, but I am convinced that either it should return the same value as the event_loop fixture or it should fail (having two event loops by default is not ideal). A neat side-effect of making the policy for the default event loop a fixture on which the event_loop fixture depends (instead of a parameter to the marker) would be to allow defining a project-specific default: each project would be able to define this new fixture as something returning a boolean (or enum value) to choose the policy and use autouse to make it a project specific default.

What are your thoughts on this?

Would you find it useful if you could reliably fail tests that have unawaited coroutines?

Right now, this test passes:

async def do_something_broken():
    assert False

@pytest.mark.asyncio
async def test_something_broken():
    do_something_broken()

The reason is that we forgot the await in test_something_broken, so the broken code never actually ran. Oops. Python does issue a RuntimeWarning: coroutine 'do_something_broken' was never awaited, and recent pytest will print this at the end of tests, but this has a few issues:

  • if your CI is green then how often do you click through to check for non-fatal warnings?
  • since the warning isn't issued until the coroutine is garbage collected, on PyPy this can happen in some random other test, or if the test is near the end of the run it might never happen at all. E.g. with latest pypy3, pytest, and pytest-asyncio, the above test doesn't issue any warnings at all:
============================= test session starts ==============================
platform linux -- Python 3.5.3[pypy-5.8.0-beta], pytest-3.2.2, py-1.4.34, pluggy-0.4.0
rootdir: /tmp, inifile:
plugins: cov-2.5.1, catchlog-1.2.2, asyncio-0.7.0
collected 1 item                                                                

../../tmp/test.py .

=========================== 1 passed in 0.02 seconds ===========================

I'm considering proposing a new feature for Python 3.7, that would make it so pytest-asyncio could do:

# Ask Python to start maintaining a list of unawaited coroutines
sys.set_unawaited_coroutine_tracking(True)
try:
    ... run the test ...
finally:
    # Get the unawaited coroutines
    unawaited_coroutines = sys.get_and_clear_unawaited_coroutines()
    sys.set_unawaited_coroutine_tracking(False)
    if unawaited_coroutines:
        # Issue an error that points to the actual problem
        raise RuntimeError(f"Unawaited coroutines: {unawaited_coroutines}")

(Names etc. to be bikeshedded later; this is "API 2" in python-trio/trio#79 (comment))

This way you could deterministically detect unawaited coroutines, reliably attribute them to the correct test, and cause it to fail with a useful error message.

Is this an API that you'd want to take advantage of if it were available?

Make `loop` fixture as alias for `event_loop`

In asyncio world parameter for event loop is always called loop, never event_loop.
I think we should use the same name for fixture (with keeping backward compatibility, sure).

Also disabling global loop by asyncio.set_event_loop(None) is useful feature.
I always run my tests in isolated environment.
Global loop may be an option (nondefault) for loop fixture.
event_loop should preserve existing global strategy but may accept option for disabling it (or we may keep it untouched encouraging people to use loop fixture).

All async fixtures must be scoped to a function

I have a session scoped fixture that runs a Postgres database in the background.
I'd like the connection pool to that database to last for the entire session for the entire session as well.

This fails because the event_loop fixture is scoped to a function.

Is there a workaround for this problem?

Python 3.4 now not supported

  File "/home/travis/build/mosquito/aio-pika/.tox/py34/lib/python3.4/site-packages/pkg_resources/__init__.py", line 2308, in resolve
    module = __import__(self.module_name, fromlist=['__name__'], level=0)
  File "/home/travis/build/mosquito/aio-pika/.tox/py34/lib/python3.4/site-packages/pytest_asyncio/plugin.py", line 81
    async def setup():

async generators pytest fixtures

Python 3.6 have introduced new async generator entity:

async def foo():
    yield 'Foo'

When trying to create yield fixture in pytest (or a simple fixture but with async def) and decorate it with @pytest.mark.asyncio i receive a coroutine instead of fixture result.

Is there a way to make pytest.mark.asyncio recognize fixtures as coroutine functions or async generator functions and behave properly on them?

event loop is closed before my fixture's finalizer runs

Consider this:

import pytest


async def foo():
    print("foo")


@pytest.yield_fixture
def my_fixture(event_loop):
    print(event_loop.is_closed()) # "False"
    event_loop.run_until_complete(foo()) # "foo"

    yield None

    print(event_loop.is_closed()) # "True"... noooo!
    event_loop.run_until_complete(foo()) # goes boom


@pytest.mark.asyncio
async def test_foo(my_fixture):
    await foo()

Run this code with py.test -s <file> and you'll see the output as indicated by the comments in the code.

The event loop is closed before my finalizer code can run. This is also the case when I write my fixture without a yield, instead adding a finalizer function.

This is weird. I'm pretty sure that this worked just fine a couple of months ago.

Cannot create subprocess when using `forbid_global_loop`

I just added the forbid_global_loop parameter to the asyncio marker. Most tests pass fine, I fixed a few others with accidental references to the default event loop. However, some of them fail in a strange way:

asyncio/base_events.py:342: in run_until_complete
    return future.result()
asyncio/futures.py:274: in result
    raise self._exception
asyncio/tasks.py:239: in _step
    result = coro.send(value)
gitmesh/storage.py:35: in check_output
    loop=loop,
asyncio/subprocess.py:197: in create_subprocess_shell
    stderr=stderr, **kwds)
asyncio/base_events.py:940: in subprocess_shell
    protocol, cmd, True, stdin, stdout, stderr, bufsize, **kwargs)
asyncio/unix_events.py:179: in _make_subprocess_transport
    with events.get_child_watcher() as watcher:
asyncio/events.py:604: in get_child_watcher
    return get_event_loop_policy().get_child_watcher()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <pytest_asyncio.plugin.ForbiddenEventLoopPolicy object at 0x103dbba20>

    def get_child_watcher(self):
        "Get the watcher for child processes."
>       raise NotImplementedError
E       NotImplementedError

Seems like, at least on OSX, the asyncio subprocess facilities depend on an undocumented get_child_watcher() method of the event loop policy.

I'm guessing part of this problem exceeds the scope of this project: custom event loops must conform to an undocumented interface to be able to support subprocesses. I'm guessing this needs to be discussed with asyncio maintainers.

However, I guess you'll also want to do something about this in this library because pytest_asyncio.plugin.ForbiddenEventLoopPolicy currently doesn't implement this hidden interface.

How to fix "attached to a different loop"?

I have a very simple app called "myapp". It uses the AsyncElasticsearch client:

from elasticsearch_async import AsyncElasticsearch

def create_app():
    app = dict()
    app['es_client'] = AsyncElasticsearch('http://index:9200/')
    app['stuff'] = Stuff(app['es_client'])
    return app

class Stuff:
    def __init__(self, es_client):
        self.es_client = es_client

    def do_async_stuff(self):
        return self.es_client.index(index='test',
                                    doc_type='test',
                                    body={'field': 'sample content'})

My question is not about AsyncElasticsearch, it just happens to be an async thing I want to work with, could be sth else like a Mongo driver or whatever.

I want to test do_async_stuff() and wrote the following conftest.py

import pytest
from myapp import create_app

@pytest.fixture(scope='session')
def app():
    return create_app()

... and test_stuff.py

import pytest

@pytest.mark.asyncio
async def test_stuff(app):
    await app['stuff'].do_async_stuff()
    assert True

When I execute the test I get an exception with the message "attached to a different loop". Digging into that matter I found that pytest-asyncio creates a new event_loop for each test case (right?). The Elasticsearch client however, takes the default loop on instantiation and sticks with it. So I tried to convince it to use the pytest-asyncio event_loop like so:

import pytest

@pytest.mark.asyncio
async def test_stuff(app, event_loop):
    app['es_client'].transport.loop = event_loop
    await app['stuff'].do_async_stuff()
    assert True

This however gives me another exception:

__________________________________ test_stuff __________________________________

app = {'es_client': <Elasticsearch([{'host': 'index', 'port': 9200, 'scheme': 'http'}])>, 'stuff': <myapp.Stuff object at 0x7ffbbaff1860>}

    @pytest.mark.asyncio
    async def test_stuff(app):
>       await app['stuff'].do_async_stuff()

test/test_stuff.py:6: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Task pending coro=<AsyncTransport.main_loop() running at /usr/local/lib/python3.5/dist-packages/elasticsearch_async/transport.py:133>>

    def __iter__(self):
        if not self.done():
            self._blocking = True
>           yield self  # This tells Task to wait for completion.
E           RuntimeError: Task <Task pending coro=<test_stuff() running at /srv/app/backend/test/test_stuff.py:6> cb=[_run_until_complete_cb() at /usr/lib/python3.5/asyncio/base_events.py:164]> got Future <Task pending coro=<AsyncTransport.main_loop() running at /usr/local/lib/python3.5/dist-packages/elasticsearch_async/transport.py:133>> attached to a different loop

How am I supposed to test this scenario?

Run tests asynchronously

Hi,

I'm not sure this is the purpose of this library but I want to run pytest tests asynchronously.

Consider this example:

import asyncio
import pytest


@pytest.mark.asyncio
async def test_1():
    await asyncio.sleep(2)


@pytest.mark.asyncio
async def test_2():
    await asyncio.sleep(2)
$ py.test -q
..
2 passed in 4.01 seconds

It would be nice to run the test suite in ~2 seconds instead of 4. Is it currently possible with pytest-asyncio or with another library ? I guess we would need to asyncio.gather() all async tests and run them in the same event loop.

Thanks !

Mock Stubbing and Monkey Patching

Does mock monkey patching work with pytest-dev?

I have 3 similar tests that allow me to stub out an instantiation of a pyserial object. The mock stub works in all 3 tests I have included below. However, I am also trying to monkeypatch a call to the get() method on a queue.

The first test succeeds, but I am doing a direct call to get and not yield from .

The second test is essentially the same as the first, but I use a yield from.

The third test just changes the return statement to a yield in the mockreturn function.

Below are the 3 test cases and the outputs from the failures.

Thanks,
Alan

import asyncio
import pytest
import serial

from esplora_serial import EsploraSerial


def test_some_interaction(monkeypatch, mocker):
    # create a queue
    q = asyncio.LifoQueue(maxsize=1)

    # stub out the call to serial.Serial in the EsploraSerial class __init__
    mocker.patch('serial.Serial')

    # instantiate the Esplora Serial class
    my_serial = EsploraSerial("/dev/ttyACM0", q)

    #create a mock return for the queue "get"
    def mockreturn(get):
        return '/abc'

    # monkey patch the lifo get to return "/abc"
    monkeypatch.setattr(asyncio.LifoQueue, 'get', mockreturn)

    resp = q.get()
    assert resp == "/abc"


@pytest.mark.asyncio
def test_some_interaction2(monkeypatch, mocker):
    # create a queue
    q = asyncio.LifoQueue(maxsize=1)

    # stub out the call to serial.Serial in the EsploraSerial class __init__
    mocker.patch('serial.Serial')

    # instantiate the Esplora Serial class
    my_serial = EsploraSerial("/dev/ttyACM0", q)

    #create a mock return for the queue "get"
    def mockreturn(get):
        return '/abc'

    # monkey patch the lifo get to return "/abc"
    monkeypatch.setattr(asyncio.LifoQueue, 'get', mockreturn)

    resp = yield from  q.get()
    assert resp == "/abc"

@pytest.mark.asyncio
def test_some_interaction3(monkeypatch, mocker):
    # create a queue
    q = asyncio.LifoQueue(maxsize=1)

    # stub out the call to serial.Serial in the EsploraSerial class __init__
    mocker.patch('serial.Serial')

    # instantiate the Esplora Serial class
    my_serial = EsploraSerial("/dev/ttyACM0", q)

    #create a mock return for the queue "get"
    def mockreturn(get):
        yield '/abc'

    # monkey patch the lifo get to return "/abc"
    monkeypatch.setattr(asyncio.LifoQueue, 'get', mockreturn)

    resp = yield from  q.get()
    assert resp == "/abc"

And now the test output:

/usr/local/bin/python3 /home/afy/PycharmProjects/pyvmmonitor/public_api/pyvmmonitor/__init__.py --profile=lsprof --spawn-ui=false /home/afy/pycharm-4.0.4/helpers/pycharm/pytestrunner.py -p pytest_teamcity /home/afy/PycharmProjects/esp4s-aio/test/test_esp4sHttpServer.py
Testing started at 7:26 PM ...
Patching args: ['/bin/sh', '-c', 'uname -p 2> /dev/null']
Process is not python, returning.
============================= test session starts ==============================
platform linux -- Python 3.4.3 -- py-1.4.26 -- pytest-2.7.0
rootdir: /home/afy/PycharmProjects/esp4s-aio/test, inifile: 
plugins: asyncio, mock
collected 3 items

PycharmProjects/esp4s-aio/test/test_esp4sHttpServer.py .F
monkeypatch = <_pytest.monkeypatch.monkeypatch object at 0xb4b146cc>
mocker = <pytest_mock.MockFixture object at 0xb4b1478c>

    @pytest.mark.asyncio
    def test_some_interaction2(monkeypatch, mocker):
        # create a queue
        q = asyncio.LifoQueue(maxsize=1)

        # stub out the call to serial.Serial in the EsploraSerial class __init__
        mocker.patch('serial.Serial')

        # instantiate the Esplora Serial class
        my_serial = EsploraSerial("/dev/ttyACM0", q)

        #create a mock return for the queue "get"
        def mockreturn(get):
            return '/abc'

        # monkey patch the lifo get to return "/abc"
        monkeypatch.setattr(asyncio.LifoQueue, 'get', mockreturn)

>       resp = yield from  q.get()
E       RuntimeError: Task got bad yield: '/'

PycharmProjects/esp4s-aio/test/test_esp4sHttpServer.py:115: RuntimeError
F
monkeypatch = <_pytest.monkeypatch.monkeypatch object at 0xb4b6156c>
mocker = <pytest_mock.MockFixture object at 0xb4b616cc>

    @pytest.mark.asyncio
    def test_some_interaction3(monkeypatch, mocker):
        # create a queue
        q = asyncio.LifoQueue(maxsize=1)

        # stub out the call to serial.Serial in the EsploraSerial class __init__
        mocker.patch('serial.Serial')

        # instantiate the Esplora Serial class
        my_serial = EsploraSerial("/dev/ttyACM0", q)

        #create a mock return for the queue "get"
        def mockreturn(get):
            yield '/abc'

        # monkey patch the lifo get to return "/abc"
        monkeypatch.setattr(asyncio.LifoQueue, 'get', mockreturn)

>       resp = yield from  q.get()

PycharmProjects/esp4s-aio/test/test_esp4sHttpServer.py:136: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

get = <LifoQueue at 0xb4b615ac maxsize=1>

    def mockreturn(get):
>       yield '/abc'
E       RuntimeError: Task got bad yield: '/abc'

PycharmProjects/esp4s-aio/test/test_esp4sHttpServer.py:131: RuntimeError


=================================== FAILURES ===================================
____________________________ test_some_interaction2 ____________________________

monkeypatch = <_pytest.monkeypatch.monkeypatch object at 0xb4b146cc>
mocker = <pytest_mock.MockFixture object at 0xb4b1478c>

    @pytest.mark.asyncio
    def test_some_interaction2(monkeypatch, mocker):
        # create a queue
        q = asyncio.LifoQueue(maxsize=1)

        # stub out the call to serial.Serial in the EsploraSerial class __init__
        mocker.patch('serial.Serial')

        # instantiate the Esplora Serial class
        my_serial = EsploraSerial("/dev/ttyACM0", q)

        #create a mock return for the queue "get"
        def mockreturn(get):
            return '/abc'

        # monkey patch the lifo get to return "/abc"
        monkeypatch.setattr(asyncio.LifoQueue, 'get', mockreturn)

>       resp = yield from  q.get()
E       RuntimeError: Task got bad yield: '/'

PycharmProjects/esp4s-aio/test/test_esp4sHttpServer.py:115: RuntimeError
____________________________ test_some_interaction3 ____________________________

monkeypatch = <_pytest.monkeypatch.monkeypatch object at 0xb4b6156c>
mocker = <pytest_mock.MockFixture object at 0xb4b616cc>

    @pytest.mark.asyncio
    def test_some_interaction3(monkeypatch, mocker):
        # create a queue
        q = asyncio.LifoQueue(maxsize=1)

        # stub out the call to serial.Serial in the EsploraSerial class __init__
        mocker.patch('serial.Serial')

        # instantiate the Esplora Serial class
        my_serial = EsploraSerial("/dev/ttyACM0", q)

        #create a mock return for the queue "get"
        def mockreturn(get):
            yield '/abc'

        # monkey patch the lifo get to return "/abc"
        monkeypatch.setattr(asyncio.LifoQueue, 'get', mockreturn)

>       resp = yield from  q.get()

PycharmProjects/esp4s-aio/test/test_esp4sHttpServer.py:136: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

get = <LifoQueue at 0xb4b615ac maxsize=1>

    def mockreturn(get):
>       yield '/abc'
E       RuntimeError: Task got bad yield: '/abc'

PycharmProjects/esp4s-aio/test/test_esp4sHttpServer.py:131: RuntimeError
====================== 2 failed, 1 passed in 0.03 seconds ======================

Process finished with exit code 0

Async fixture teardown

Hi!
I have question regarding how asynchronous fixture teardown (e.g. closing a connection that was open asynchronously) is supposed to work.

When I have a fixture such as:

@pytest.fixture(scope='session')
@async_generator
async def fixture():
    # ... async opening of a connection
    connection = await asyncpg.connect(...)
    await yield_(connection)
    await connection.close()

I am getting the following error: ValueError: Async generator fixture didn't stop.Yield only once.

Bug: Async fixtures with async fixtures

Tested with current head, f6d84f6

The following test case fails

import asyncio
import pytest

@pytest.fixture()
async def async_inner_fixture():
    await asyncio.sleep(0.01)
    print('inner async_fixture start')
    yield True
    print('inner async_fixture stop')


@pytest.fixture()
async def async_fixture(async_inner_fixture):
    await asyncio.sleep(0.01)
    print('async_fixture start')
    yield True
    assert async_inner_fixture is True
    print('async_fixture stop')


@pytest.mark.asyncio
async def test_async_fixture(async_fixture):
    assert async_fixture is True
    print('test_async_fixture')

with the following error

F async_fixture start
test_async_fixture
 
tests/test_integration.py:66 (test_async_fixture)
async_inner_fixture = <async_generator object async_inner_fixture at 0x7f7db1f5d868>

    @pytest.fixture()
    async def async_fixture(async_inner_fixture):
        await asyncio.sleep(0.01)
        print('async_fixture start')
        yield True
>       assert async_inner_fixture is True
E       assert <async_generator object async_inner_fixture at 0x7f7db1f5d868> is True

tests/test_integration.py:63: AssertionError

I.e. async_inner_fixture is not working properly when used in async_fixture.

"-k" option does not behave correctly

Background

-k EXPRESSION         only run tests which match the given substring
                        expression. An expression is a python evaluatable
                        expression where all names are substring-matched
                        against test names and their parent classes. Example:
                        -k 'test_method or test_other' matches all test
                        functions and classes whose name contains
                        'test_method' or 'test_other', while -k 'not
                        test_method' matches those that don't contain
                        'test_method' in their names. Additionally keywords
                        are matched to classes and functions containing extra
                        names in their 'extra_keyword_matches' set, as well as
                        functions which have names assigned directly to them.

Steps to reproduce

  1. Use code
@pytest.mark.asyncio
async def test_bug():
    pass

@pytest.mark.asyncio
async def test_integration():
    pass
  1. Run pytest with -k test_integration

Observed output:
core/test_integration.py::test_bug PASSED
core/test_integration.py::test_integration PASSED

Expected output:
core/test_integration.py::test_integration PASSED

Note that -k test_bug works fine.

attaching a watcher to an event loop breaks finalizer

In order to be able to handle asyncio.subprocesses with both stdout, stderr callbacks one need to have a child watcher that's attached to the loop, so the event_loop needs to look something like:

def get_event_loop():
    if sys.platform == 'win32':
        return asyncio.ProactorEventLoop()  # on windows IO needs this
    return asyncio.new_event_loop()  # default on UNIX is fine

@pytest.yield_fixture()
def event_loop():
    """pytest-asyncio customization"""
    loop = get_event_loop()
    if sys.platform != "win32":
        # on UNIX we also need to attach the loop to the child watcher for asyncio.subprocess
        policy = asyncio.get_event_loop_policy()
        watcher = asyncio.SafeChildWatcher()
        watcher.attach_loop(loop)
        policy.set_child_watcher(watcher)
    try:
        yield loop
    finally:
        loop.close()

Now the problem is that pytest-asyncio finalizer tries to attach back to the old event loop: https://github.com/pytest-dev/pytest-asyncio/blob/master/pytest_asyncio/plugin.py#L139, and at that point all watchers are also attached to that loop too. However, you cannot attach to a closed loop, so if the old loop is closed the finalizer fixture will throw an exception: raise RuntimeError('Event loop is closed') 😞

points of interest:
attach_loop for watcher - https://github.com/python/cpython/blob/master/Lib/asyncio/unix_events.py#L758
add_signal_handler check closed - https://github.com/python/cpython/blob/master/Lib/asyncio/unix_events.py#L84
set_event_loop for policy - https://github.com/python/cpython/blob/master/Lib/asyncio/unix_events.py#L1007

As a work around setting the event loop at the start to None solves this, however maybe we should not attach back to closed loops, and instead just remove those loops.

Tests don't fail anymore

having a minimal:

from pytest import mark
@mark.xfail(reason="I need a failing 
def test_fail_please():
     assert False

@mark.xfail(reason="I'm testing for a pytest-asyncio-bug - if I don't fail something is broken.")
@mark.asyncio
def test_fail_async():
     assert False

test_fail_async fails as expected with asyncio==0.3.0, but succeeds with asyncio==0.4.0.

(running python3.5.1)

[Feature request] add a loop fixture

People very often asked why pytest-asyncio and pytest-aiohttp are not compatible.
Well, AFAIK they are not mutually exclusive but the compatibility is not perfect still.

I had very strong objection for pytest-asyncio behavior: from my perspective event_loop fixture should call asyncio.set_event_loop(None) before running a test. It's prevent forgetting to pass loop param to calls like asyncio.sleep() etc.

But not I withdraw my objection: since Python 3.5.3 the coroutine always gets a running event loop regardless to asyncio.set_event_loop() param.

We still have many differences between pytest-asyncio and pytest-aiohttp: a need for special @pytest.mark.asyncio markup for pytest-asyncio and event loop naming.

In pytest-asyncio it is event_loop but int pytest-aiohttp the name is loop.

The loop name is used everywhere in asyncio documentation, I love to keep it (at least the name is shorter).

@Tinche would you consider adding the loop fixture as an alias for event_loop?

I'm not a pytest jedi, in my mind the fixture should look like:

@pytest.yield_fixture
def loop(event_loop):
    return event_loop

but maybe you have a better idea.

Don't close the loop if it's already closed

Love the lib!

Right now I can't use it with code that closes the event loop. Could you make it so it check if loop.is_closed(), and if it is, not attempt to close (which make the test fails)?

0.6.0 version causing test failures with: There is no current event loop in thread 'MainThread'

Got Error:

RuntimeError: There is no current event loop in thread 'MainThread'.

For example:
https://travis-ci.org/botstory/botstory/builds/236994191#L594

Sources of unit test:
https://github.com/botstory/botstory/blob/develop/botstory/chat_test.py#L38

@pytest.mark.asyncio
async def test_should_say(mock_interface):
    with answer.Talk() as talk:
        story = talk.story
        story.use(mock_interface)

        @story.on('hi there!')
        def one_story():
            @story.part()
            async def then(ctx):
                await story.say('Nice to see you!', ctx['user'])

        await talk.pure_text('hi there!')

        mock_interface.send_text_message.assert_called_once_with(
            recipient=talk.user,
            text='Nice to see you!',
            options=None,
        )

And the same error for many other async tests which are marked with @pytest.mark.asyncio, with fixtures and without it.

Deps are: https://github.com/botstory/botstory/blob/develop/requirements.txt

aiohttp==2.1.0
motor==1.1
pytest==3.1.0
pytest-aiohttp==0.1.3
pytest-asyncio==0.5.0
pytest-catchlog==1.2.2
pytest-cov==2.5.1
pytest-flakes==2.0.0
pytest-mock==1.6.0
yarl==0.10.2

previous version works fine, except deprecation warning

Make loop accessible outside of test function

I am using pytest_asyncio for my asyncio tests and am trying to get it to work with another fixture I wrote. They work together fine as long as I only access the loop inside the test function by calling functions on the passed in fixture. I was trying to figure out if it was possible to access the loop in the fixture function itself (or even pytest_runtest_setup).

I'm not that familiar with py.test internals, but as far as I can tell from reading examples and source code, it looks like pytest_asyncio creates the loop right before calling the test function (after *_setup and fixture functions are called) and then closes the loop right after execution. Is this right?

Specifically, what I'm trying to do is kick off an asyncio.ensure_future() right before the test runs using the loop that the test will be running on.

Latest version causing test failures

Getting:

E           RuntimeError: Task <Task pending coro=<pytest_fixture_setup.<locals>.wrapper.<locals>.setup() running at /home/travis/build/plone/guillotina/eggs/pytest_asyncio-0.6.0-py3.6.egg/pytest_asyncio/plugin.py:120> cb=[_run_until_complete_cb() at /opt/python/3.6-dev/lib/python3.6/asyncio/base_events.py:176]> got Future <_GatheringFuture pending> attached to a different loop

on many of my tests now.

Downgrading to 0.5.0 works. Is there something I'm doing wrong in my tests?

Configuration option for a custom event loop policy and/or custom event loop class?

It would be useful when testing an application that uses a custom implementation of asyncio.AbstractEventLoop to be able to provide a custom event loop policy and/or custom event loop via a configuration option.

It is of course already possible to set the event loop policy globally with asyncio.set_event_loop_policy (e.g. in pytest_configure) or to override the event_loop fixture so that it returns a loop of the desired class, so perhaps this would be unnecessary.

Add async finalizers to the request-context

Hi,
My project is Python 3.5 only, and it's therefore impossible for me to use async generators in fixtures. But we might be able to workaround this if it were possible to define async finalizers to request.

E.g.

@pytest.fixture
async def foo_fixture(request):
    async def fin():
        await some_stuff()
    request.addasyncfinalizer(fin)
    return 42

Then it would await for each async finalizers before tear down.

Not sure whether it would involve monkey patching on the request object, or what would be acceptable. Maybe there are also more proper solutions that doesn't involve request or that make use of already existing features? I already tried to use loop.create_task() in a standard finalizer, unfortunately, the created task seem to execute after teardown which produces the following error message:
2017-06-26 18:15:50,262 - asyncio - ERROR - Task was destroyed but it is pending!

Many thanks!

Conflicting `pytest-aiohttp` and `pytest-asyncio`

See the aio-libs/pytest-aiohttp#8

I am not sure, if it is fault of one or the other plugin or there is a need on some naming convention not to clash one with another.

Personally I consider pytest-asyncio more general and would expect a fix on pytest-aiohttp side, but this is too early to tell before the reason is well described.

No version bump to 0.4.1?

Hi,

I see that there is a 0.4.1 version on pypi but the code here on GitHub is still on version 0.4.0. Can you publish the commit for 0.4.1? I am asking this because I am maintaining this package for fedora and I use the source from GitHub and I would like to update it to 0.4.1.

Could not find suitable distribution for Requirement.parse('pytest-asyncio')

easy_install is not capable[1] of install pytest-asyncio I know pip is the tool for the job but when you use setup.py to manage requirements it uses easy_install for it. I think the problem could be the name of wheel package that is pytest_asyncio with _.

[1] https://packaging.python.org/en/latest/pip_easy_install.html

$ easy_install pytest-asyncio
Searching for pytest-asyncio
Reading https://pypi.python.org/simple/pytest-asyncio/
No local packages or download links found for pytest-asyncio
error: Could not find suitable distribution for Requirement.parse('pytest-asyncio')
(test)wiliam@vostro:~/.../test$ easy_install pytest_asyncio
Searching for pytest-asyncio
Reading https://pypi.python.org/simple/pytest_asyncio/
No local packages or download links found for pytest-asyncio
error: Could not find suitable distribution for Requirement.parse('pytest-asyncio')

Support for test methods

Currently asyncio doesn't support test methods (see #4):

class Test:
    @pytest.mark.asyncio
    def test_some_asyncio_code(self):
        res = yield from library.do_something()
        assert b'expected result' == res

Changelog?

An upgrade to 0.4.1 broke all my tests and I’m unable to locate some kind of changelog that could give me cues what’s going wrong…could you start keeping one please? :)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.