Giter Club home page Giter Club logo

unsync's Introduction

unsync

Unsynchronize asyncio by using an ambient event loop, or executing in separate threads or processes.

Quick Overview

Functions marked with the @unsync decorator will behave in one of the following ways:

  • async functions will run in the unsync.loop event loop executed from unsync.thread
  • Regular functions will execute in unsync.thread_executor, a ThreadPoolExecutor
    • Useful for IO bounded work that does not support asyncio
  • Regular functions marked with @unsync(cpu_bound=True) will execute in unsync.process_executor, a ProcessPoolExecutor
    • Useful for CPU bounded work

All @unsync functions will return an Unfuture object. This new future type combines the behavior of asyncio.Future and concurrent.Future with the following changes:

  • Unfuture.set_result is threadsafe unlike asyncio.Future
  • Unfuture instances can be awaited, even if made from concurrent.Future
  • Unfuture.result() is a blocking operation except in unsync.loop/unsync.thread where it behaves like asyncio.Future.result and will throw an exception if the future is not done

Examples

Simple Sleep

A simple sleeping example with asyncio:

async def sync_async():
    await asyncio.sleep(1)
    return 'I hate event loops'


async def main():
    future1 = asyncio.create_task(sync_async())
    future2 = asyncio.create_task(sync_async())

    await future1, future2

    print(future1.result() + future2.result())

asyncio.run(main())
# Takes 1 second to run

Same example with unsync:

@unsync
async def unsync_async():
    await asyncio.sleep(1)
    return 'I like decorators'

unfuture1 = unsync_async()
unfuture2 = unsync_async()
print(unfuture1.result() + unfuture2.result())
# Takes 1 second to run

Multi-threading an IO-bound function

Synchronous functions can be made to run asynchronously by executing them in a concurrent.ThreadPoolExecutor. This can be easily accomplished by marking the regular function @unsync.

@unsync
def non_async_function(seconds):
    time.sleep(seconds)
    return 'Run concurrently!'

start = time.time()
tasks = [non_async_function(0.1) for _ in range(10)]
print([task.result() for task in tasks])
print('Executed in {} seconds'.format(time.time() - start))

Which prints:

['Run concurrently!', 'Run concurrently!', ...]
Executed in 0.10807514190673828 seconds

Continuations

Using Unfuture.then chains asynchronous calls and returns an Unfuture that wraps both the source, and continuation. The continuation is invoked with the source Unfuture as the first argument. Continuations can be regular functions (which will execute synchronously), or @unsync functions.

@unsync
async def initiate(request):
    await asyncio.sleep(0.1)
    return request + 1

@unsync
async def process(task):
    await asyncio.sleep(0.1)
    return task.result() * 2

start = time.time()
print(initiate(3).then(process).result())
print('Executed in {} seconds'.format(time.time() - start))

Which prints:

8
Executed in 0.20314741134643555 seconds

Mixing methods

We'll start by converting a regular synchronous function into a threaded Unfuture which will begin our request.

@unsync
def non_async_function(num):
    time.sleep(0.1)
    return num, num + 1

We may want to refine the result in another function, so we define the following continuation.

@unsync
async def result_continuation(task):
    await asyncio.sleep(0.1)
    num, res = task.result()
    return num, res * 2

We then aggregate all the results into a single dictionary in an async function.

@unsync
async def result_processor(tasks):
    output = {}
    for task in tasks:
        num, res = await task
        output[num] = res
    return output

Executing the full chain of non_async_functionโ†’result_continuationโ†’result_processor would look like:

start = time.time()
print(result_processor([non_async_function(i).then(result_continuation) for i in range(10)]).result())
print('Executed in {} seconds'.format(time.time() - start))

Which prints:

{0: 2, 1: 4, 2: 6, 3: 8, 4: 10, 5: 12, 6: 14, 7: 16, 8: 18, 9: 20}
Executed in 0.22115683555603027 seconds

Preserving typing

As far as we know it is not possible to change the return type of a method or function using a decorator. Therefore, we need a workaround to properly use IntelliSense. You have three options in general:

  1. Ignore type warnings.

  2. Use a suppression statement where you reach the type warning.

    A. When defining the unsynced method by changing the return type to an Unfuture.

    B. When using the unsynced method.

  3. Wrap the function without a decorator. Example:

    def function_name(x: str) -> Unfuture[str]:
        async_method = unsync(__function_name_synced)
        return async_method(x)
    
    def __function_name_synced(x: str) -> str:
        return x + 'a'
    
    future_result = function_name('b')
    self.assertEqual('ba', future_result.result())

Custom Event Loops

In order to use custom event loops, be sure to set the event loop policy before calling any @unsync methods. For example, to use uvloop simply:

from unsync import unsync
import uvloop

@unsync
async def main():
    # Main entry-point.
    ...

uvloop.install() # Equivalent to asyncio.set_event_loop_policy(EventLoopPolicy())
main()

unsync's People

Contributors

alex-sherman avatar and-semakin avatar jankulovski avatar jcass77 avatar kes31 avatar luttik avatar mikaelho avatar rednafi avatar tadoran avatar tusharsadhwani avatar waylonwalker avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

unsync's Issues

Update docs for Python 3.7

Examples in README should be updated for Python 3.7.

async def sync_async():
    await asyncio.sleep(0.1)
    return 'I hate event loops'

result = asyncio.run(sync_async())
print(result)

Now it gets simpler with regular asyncio functions but unsync still do its good magic :)

Unsync conda-forge recipe

This is more a notification than an issue but just letting the maintainers here know I created a conda-forge recipe for unsync which lives here Installable in a conda environment with

conda install unsync -c conda-forge

I'm mentioning this as I attempted to paraphrase the description of what unsync is there but might misrepresented it. Feel free to create issues for me to fix any descriptions or documentation there.

If any of the maintainers here would like to be added as a maintainer on the conda feedstock for permission to update the auto-generated PRs let me know too - but I'm happy to look after the conda recipe by myself if you'd rather not.

closing old processes

Hi,
After running my script that uses unsync (thank you so much for this great tool!!) for a couple of hours my os starts complaining about all the open processes. I read that setting unsync.process_executor = None would clean out old processes, but I'm getting AttributeError: can't set attribute when executing that line in my code.

Support naive introspection for wrapped functions

It does not seem like the @unsync decorator can currently be used as part of a nested decorator chain.

For example, trying to use unsync in a pytest fixture:

@pytest.fixture
@unsync
async def my_test_fixture():
    pass

...fails with @unsync E AttributeError: 'unsync' object has no attribute '__name__'

This limits the range of scenarios that unsync can be used in.

max_workers

Is there a posibility to set the max_workers?

Best regards

Continuation with cpu_bound functions

Using a cpu_bound function in a continuation chain currently fails. Example:

from unsync import unsync
import asyncio
import time

@unsync()
async def download_data(url):
    await asyncio.sleep(1)
    return 'data'

@unsync(cpu_bound=True)
def process_data(task):
    data = task.result()
    time.sleep(1)
    return 'processed data'

@unsync()
def store_processed_data(task):
    data = task.result()
    time.sleep(1)
    print('Stored')

tasks = [
    download_data(url).then(process_data).then(store_processed_data)
    for url in ['url1', 'url2', 'url3']
]

[task.result() for task in tasks]

# TypeError: cannot pickle '_asyncio.Task' object

I think this is a fairly large drawback. It could easily be solved by passing the result() to the continuation rather than the Unfuture. If there are good reasons for passing the Unfuture, perhaps it could somehow be made picklable, or the unpicklable parts could be stripped / swapped out before passing it on.

Using @unsync decorator in Jupyter results in error

Running the example code, but inside a Jupiter notebook results in the following error:
TypeError: 'module' object is not callable

Here's the code in a single Jupyter cell, running Python 3.7.7

import unsync

@unsync
async def unsync_async():
await asyncio.sleep(1)
return 'I like decorators'

unfuture1 = unsync_async()
unfuture2 = unsync_async()
print(unfuture1.result() + unfuture2.result())

`BrokenProcessPool` error using function with `cpu_bound=True`

Code seems to follow the examples, and the unsynced function is in global scope. Is the fact that lifted is not global a problem? I tried making it global and it did not seem to help.

import unsync as un
from typing import Callable, List


def parallelize(f: Callable) -> Callable:
    def lifted(series):
        futures: List[un.Unfuture] = [f(i) for i in series]  # type:ignore
        vals = [f.result() for f in futures]
        return vals

    return lifted


import time


@un.unsync(cpu_bound=True)
def slow_inc(n):
    time.sleep(4)
    return n + 1


print(parallelize(slow_inc)([1, 2, 3]))

It's a long error trace included below, ending with: concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

I don't think it's user error on my part.

(gpt1-py3.11) GPT1 ) /Users/desmond/Documents/code/AI-invest/GPT1/.venv/bin/python /Users/desmond/Documents/code/AI-invest/GPT1/src/unsync_bug.py                                           15:30:45
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/spawn.py", line 120, in spawn_main
    exitcode = _main(fd, parent_sentinel)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/spawn.py", line 129, in _main
    prepare(preparation_data)
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/spawn.py", line 240, in prepare
    _fixup_main_from_path(data['init_main_from_path'])
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/spawn.py", line 291, in _fixup_main_from_path
    main_content = runpy.run_path(main_path,
                   ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<frozen runpy>", line 291, in run_path
  File "<frozen runpy>", line 98, in _run_module_code
  File "<frozen runpy>", line 88, in _run_code
  File "/Users/desmond/Documents/code/AI-invest/GPT1/src/unsync_bug.py", line 23, in <module>
    print(parallelize(slow_inc)([1, 2, 3]))
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/desmond/Documents/code/AI-invest/GPT1/src/unsync_bug.py", line 7, in lifted
    futures: List[un.Unfuture] = [f(i) for i in series]  # type:ignore
                                 ^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/desmond/Documents/code/AI-invest/GPT1/src/unsync_bug.py", line 7, in <listcomp>
    futures: List[un.Unfuture] = [f(i) for i in series]  # type:ignore
                                  ^^^^
  File "/Users/desmond/Documents/code/AI-invest/GPT1/.venv/lib/python3.11/site-packages/unsync/unsync.py", line 78, in __call__
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/spawn.py", line 120, in spawn_main
    exitcode = _main(fd, parent_sentinel)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/spawn.py", line 129, in _main
    prepare(preparation_data)
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/spawn.py", line 240, in prepare
    _fixup_main_from_path(data['init_main_from_path'])
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/spawn.py", line 291, in _fixup_main_from_path
    main_content = runpy.run_path(main_path,
                   ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<frozen runpy>", line 291, in run_path
  File "<frozen runpy>", line 98, in _run_module_code
  File "<frozen runpy>", line 88, in _run_code
  File "/Users/desmond/Documents/code/AI-invest/GPT1/src/unsync_bug.py", line 23, in <module>
    print(parallelize(slow_inc)([1, 2, 3]))
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/desmond/Documents/code/AI-invest/GPT1/src/unsync_bug.py", line 7, in lifted
    futures: List[un.Unfuture] = [f(i) for i in series]  # type:ignore
                                 ^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/desmond/Documents/code/AI-invest/GPT1/src/unsync_bug.py", line 7, in <listcomp>
    futures: List[un.Unfuture] = [f(i) for i in series]  # type:ignore
                                  ^^^^
  File "/Users/desmond/Documents/code/AI-invest/GPT1/.venv/lib/python3.11/site-packages/unsync/unsync.py", line 78, in __call__
    future = unsync.process_executor.submit(
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/process.py", line 787, in submit
    self._adjust_process_count()
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/process.py", line 746, in _adjust_process_count
    self._spawn_process()
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/process.py", line 764, in _spawn_process
    p.start()
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/process.py", line 121, in start
    self._popen = self._Popen(self)
                  ^^^^^^^^^^^^^^^^^
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/context.py", line 288, in _Popen
    return Popen(process_obj)
           ^^^^^^^^^^^^^^^^^^
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/popen_spawn_posix.py", line 32, in __init__
    super().__init__(process_obj)
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/popen_fork.py", line 19, in __init__
    self._launch(process_obj)
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/popen_spawn_posix.py", line 42, in _launch
    prep_data = spawn.get_preparation_data(process_obj._name)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/spawn.py", line 158, in get_preparation_data
    _check_not_importing_main()
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/spawn.py", line 138, in _check_not_importing_main
    raise RuntimeError('''
RuntimeError: 
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.
    future = unsync.process_executor.submit(
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/process.py", line 787, in submit
    self._adjust_process_count()
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/process.py", line 746, in _adjust_process_count
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/spawn.py", line 120, in spawn_main
    exitcode = _main(fd, parent_sentinel)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/spawn.py", line 129, in _main
    prepare(preparation_data)
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/spawn.py", line 240, in prepare
    _fixup_main_from_path(data['init_main_from_path'])
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/spawn.py", line 291, in _fixup_main_from_path
    main_content = runpy.run_path(main_path,
                   ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<frozen runpy>", line 291, in run_path
  File "<frozen runpy>", line 98, in _run_module_code
  File "<frozen runpy>", line 88, in _run_code
  File "/Users/desmond/Documents/code/AI-invest/GPT1/src/unsync_bug.py", line 23, in <module>
    print(parallelize(slow_inc)([1, 2, 3]))
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/desmond/Documents/code/AI-invest/GPT1/src/unsync_bug.py", line 7, in lifted
    futures: List[un.Unfuture] = [f(i) for i in series]  # type:ignore
                                 ^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/desmond/Docume    self._spawn_process()
nts/code/AI-invest/GPT1/src/unsync_bug.py", line 7, in <listcomp>
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/process.py", line 764, in _spawn_process
    p.start()
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/process.py", line 121, in start
    self._popen = self._Popen(self)
                  ^^^^^^^^^^^^^^^^^
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/context.py", line 288, in _Popen
    futures: List[un.Unfuture] = [f(i) for i in series]  # type:ignore
                                  ^^^^
  File "/Users/desmond/Documents/code/AI-invest/GPT1/.venv/lib/python3.11/site-packages/unsync/unsync.py", line 78, in __call__
    return Popen(process_obj)
           ^^^^^^^^^^^^^^^^^^
    future = unsync.process_executor.submit(
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/process.py", line 787, in submit
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/popen_spawn_posix.py", line 32, in __init__
    self._adjust_process_count()
    super().__init__(process_obj)
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/process.py", line 746, in _adjust_process_count
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/popen_fork.py", line 19, in __init__
    self._launch(process_obj)
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/popen_spawn_posix.py", line 42, in _launch
    prep_data = spawn.get_preparation_data(process_obj._name)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/spawn.py", line 158, in get_preparation_data
    self._spawn_process()
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/process.py", line 764, in _spawn_process
    _check_not_importing_main()
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/spawn.py", line 138, in _check_not_importing_main
    raise RuntimeError('''
RuntimeError: 
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.
    p.start()
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/process.py", line 121, in start
    self._popen = self._Popen(self)
                  ^^^^^^^^^^^^^^^^^
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/context.py", line 288, in _Popen
    return Popen(process_obj)
           ^^^^^^^^^^^^^^^^^^
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/popen_spawn_posix.py", line 32, in __init__
    super().__init__(process_obj)
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/popen_fork.py", line 19, in __init__
    self._launch(process_obj)
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/popen_spawn_posix.py", line 42, in _launch
    prep_data = spawn.get_preparation_data(process_obj._name)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/spawn.py", line 158, in get_preparation_data
    _check_not_importing_main()
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/spawn.py", line 138, in _check_not_importing_main
    raise RuntimeError('''
RuntimeError: 
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.
Traceback (most recent call last):
  File "/Users/desmond/Documents/code/AI-invest/GPT1/src/unsync_bug.py", line 23, in <module>
    print(parallelize(slow_inc)([1, 2, 3]))
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/desmond/Documents/code/AI-invest/GPT1/src/unsync_bug.py", line 8, in lifted
    vals = [f.result() for f in futures]
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/desmond/Documents/code/AI-invest/GPT1/src/unsync_bug.py", line 8, in <listcomp>
    vals = [f.result() for f in futures]
            ^^^^^^^^^^
  File "/Users/desmond/Documents/code/AI-invest/GPT1/.venv/lib/python3.11/site-packages/unsync/unsync.py", line 144, in result
    return self.concurrent_future.result(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/_base.py", line 456, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.
Future exception was never retrieved
future: <Future finished exception=BrokenProcessPool('A process in the process pool was terminated abruptly while the future was running or pending.')>
Traceback (most recent call last):
  File "/Users/desmond/Documents/code/AI-invest/GPT1/src/unsync_bug.py", line 23, in <module>
    print(parallelize(slow_inc)([1, 2, 3]))
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/desmond/Documents/code/AI-invest/GPT1/src/unsync_bug.py", line 8, in lifted
    vals = [f.result() for f in futures]
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/desmond/Documents/code/AI-invest/GPT1/src/unsync_bug.py", line 8, in <listcomp>
    vals = [f.result() for f in futures]
            ^^^^^^^^^^
  File "/Users/desmond/Documents/code/AI-invest/GPT1/.venv/lib/python3.11/site-packages/unsync/unsync.py", line 144, in result
    return self.concurrent_future.result(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/_base.py", line 456, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.
Future exception was never retrieved
future: <Future finished exception=BrokenProcessPool('A process in the process pool was terminated abruptly while the future was running or pending.')>
Traceback (most recent call last):
  File "/Users/desmond/Documents/code/AI-invest/GPT1/src/unsync_bug.py", line 23, in <module>
    print(parallelize(slow_inc)([1, 2, 3]))
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/desmond/Documents/code/AI-invest/GPT1/src/unsync_bug.py", line 8, in lifted
    vals = [f.result() for f in futures]
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/desmond/Documents/code/AI-invest/GPT1/src/unsync_bug.py", line 8, in <listcomp>
    vals = [f.result() for f in futures]
            ^^^^^^^^^^
  File "/Users/desmond/Documents/code/AI-invest/GPT1/.venv/lib/python3.11/site-packages/unsync/unsync.py", line 144, in result
    return self.concurrent_future.result(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/_base.py", line 456, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.
Future exception was never retrieved
future: <Future finished exception=BrokenProcessPool('A process in the process pool was terminated abruptly while the future was running or pending.')>
Traceback (most recent call last):
  File "/Users/desmond/Documents/code/AI-invest/GPT1/src/unsync_bug.py", line 23, in <module>
    print(parallelize(slow_inc)([1, 2, 3]))
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/desmond/Documents/code/AI-invest/GPT1/src/unsync_bug.py", line 8, in lifted
    vals = [f.result() for f in futures]
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/desmond/Documents/code/AI-invest/GPT1/src/unsync_bug.py", line 8, in <listcomp>
    vals = [f.result() for f in futures]
            ^^^^^^^^^^
  File "/Users/desmond/Documents/code/AI-invest/GPT1/.venv/lib/python3.11/site-packages/unsync/unsync.py", line 144, in result
    return self.concurrent_future.result(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/_base.py", line 456, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

process_executor.py example on Windows 7 orphans the multiprocess kernels

Hi,
Running the process_executor.py example in windows, does indeed create the parallel process, as evidenced by TaskManager, but the processes do not close on their own.
[Tested using PyCharm, on a Windows7 install, 64-bit OS)

Have I misconfigured, or am I missing something, or is this a limitation?
I can indeed see running the example creates the processes...
image

I would have thought each of the processes would have been closed/terminated after they completed.

*edit
Thinking/just realized: the onus is on me to set unsync.process_executor = None, isn't it?
(The benefit then, is that the processingPool is setup and ready to go again, if it needs to be used, right?)

p.s. Thank you for sharing your excellent work.

Python 3.9.x compatability?

Hi @alex-sherman , thanks a lot for this great API. I just wanted to give it a quick test with Python 3.9.0, but I get the following error. Number of concurrent processes do not change error (even 1 gives the same error).

Code:

from unsync import unsync
import time


@unsync(cpu_bound=True)
def heavy_calculation(num: int) -> int:

    print(f"iteration {num} started")
    return num ** num


if __name__ == "__main__":

    jobs = [heavy_calculation(i) for i in range(100)]

    for job in jobs:
        print(job.result())

    print(time.perf_counter())

Error:

Traceback (most recent call last):
  File "/Users/bbb/OneDrive/Docs/-- BW  --/-- Dev/SANDBOX/t_unsync.py", line 35, in <module>
    print(job.result())
  File "/Users/bbb/OneDrive/Docs/-- BW  --/-- Dev/SANDBOX/sandbox/lib/python3.9/site-packages/unsync/unsync.py", line 117, in result
    return self.concurrent_future.result(*args, **kwargs)
  File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py", line 440, in result
    return self.__get_result()
  File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/_base.py", line 389, in __get_result
    raise self._exception
KeyError: ('__main__', 'heavy_calculation')
(sandbox) bbb@Bariss-MacBook-Pro SANDBOX % /Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/resource_tracker.py:216: UserWarning: resource_tracker: There appear to be 5 leaked semaphore objects to clean up at shutdown
  warnings.warn('resource_tracker: There appear to be %d '

Decorator problem

See your example of thread_executor.py, and modify that for execution, thus

import time
from unsync import unsync

# Convert synchronous functions into Unfutures to be executed in `unsync.executor`

@unsync
def non_async_function(seconds):
    time.sleep(seconds)
    return 'Run in parallel!'


def main():
    start = time.time()
    tasks = [non_async_function(0.1) for _ in range(10)]
    print([task.result() for task in tasks])
    print('Executed in {} seconds'.format(time.time() - start))

    # Use the decorator on existing functions

    unsync_sleep = unsync(time.sleep)

    start = time.time()
    tasks = [unsync_sleep(0.1) for _ in range(10)]
    print([task.result() for task in tasks])
    print('Executed in {} seconds'.format(time.time() - start))


if __name__ == '__main__':
    main()

Running this fails with:

$ python thread_executor.py 
['Run in parallel!', 'Run in parallel!', 'Run in parallel!', 'Run in parallel!', 'Run in parallel!', 'Run in parallel!', 'Run in parallel!', 'Run in parallel!', 'Run in parallel!', 'Run in parallel!']
Executed in 0.2030332088470459 seconds
Traceback (most recent call last):
  File "thread_executor.py", line 29, in <module>
    main()
  File "thread_executor.py", line 23, in main
    tasks = [unsync_sleep(0.1) for _ in range(10)]
  File "thread_executor.py", line 23, in <listcomp>
    tasks = [unsync_sleep(0.1) for _ in range(10)]
  File "/obscuredpath/venv/lib/python3.8/site-packages/unsync/unsync.py", line 45, in __call__
    self._set_func(args[0])
  File "/obscuredpath/venv/lib/python3.8/site-packages/unsync/unsync.py", line 38, in _set_func
    assert inspect.isfunction(func)
AssertionError

In in test_thread_executor.py there is not test case for decorating an existing function with arguments similar to this example, thus the unit tests pass. This example however does not work. Or did I misunderstand something?

Incorrect import in README

The import should read

from unsync import unsync

not

import unsync

When I try running the sample code, I get an error

TypeError: 'module' object is not callable

Typing

It would be great if the IDE could understand that an unsynced method returns an Unfuture rather than the usual data. And it would be even sweater if the Unfuture would be generic such that we know which value .result() returns.

CPU Bound Decorator in Wrapped Functions

For context, I am a new user of unsync and I am exploring its use for the problem of running concurrent code triggered from a tkinter gui. I'm currently studying ways of getting data produced by concurrent operations back into tkinter's own event loop.
As part of that I've come across a problem with cpu bound functions.

In the following code the cpu_bound_2 function fails with this exception:


Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/process.py", line 243, in _process_worker
    r = call_item.fn(*call_item.args, **call_item.kwargs)
  File "/Users/stephen/Documents/Coding/Scrapbook/env/lib/python3.9/site-packages/unsync/unsync.py", line 98, in _multiprocess_target
    return unsync.unsync_functions[func_name](*args, **kwargs)
KeyError: ('__main__', 'func')
"""

The above exception was the direct cause of the following exception:

KeyError: ('__main__', 'func')
import math
import sys
import time

from unsync import unsync


@unsync(cpu_bound=True)
def cpu_bound_1():
    for number in range(20_000_000):
        math.sqrt(number)
    print("Finishing cpu_bound_1")
    
    
def threadable():
    @unsync
    def func():
        time.sleep(3)
        print("Finishing threadable")
    return func


def cpu_bound_2():
    @unsync(cpu_bound=True)
    def func():
        for number in range(20_000_000):
            math.sqrt(number)
        print("Finishing cpu_bound_2")
    return func


def main():
    cpu_bound_1()
    threadable()()
    cpu_bound_2()()


if __name__ == '__main__':
    sys.exit(main())

Python 3.9 on macOS Big Sur 11.4.

Async generators

Scenarios like these are not currently supported:

@unsync
def my_gen(block_size):
    while block := slow_read(block_size):
        yield from block

async def my_consumer():
    async for item in my_gen(1000):
         print(f"Found item: {item}")

They will error out by not finding __aiter__. At first sight this is easy to add, and I was planning to just drop a PR and walk off, but there are a few design decisions here.

Should we just call next() on whatever thread in the thread pool as __anext__ is called? Or should we trap a thread for the lifetime of the iterator, detecting iterator abandonment using the weakref module? Should that be exposed as an option in the decorator?

Unclear how async methods should make use of unsync methods

You can't (from what I tried) await some_unsync().

Looked into it a bit and it seems to be a misunderstanding on my part. This library isn't actually meant to help with this. There was a comment on a reddit thread by the author that gave an example of 'async'ifying a requests.get call, but, being not natively async, it's near useless for use alongside normal async functions.

(I'm beginning to think async in Python is just hopeless.)

asherman.io unsync page typo, or not

Howdy,

This page has some amusing bits http://asherman.io/projects/unsync.html

but possibly this wasn't an intended joke

makes its own in a new thread whose soul purpose

Did you mean

makes its own in a new thread whose sole purpose

it kind of reads ok with 'soul', but I think Unfuture is more likely to have a soul, since it embodies the spirit of two Futures

Any support or alternative for `asyncio.Semaphore`?

Hi! Thanks for developing unsync, it's a great job. Since I am new to this module and asyncio , I am wondering whether there is a way to apply asyncio.Semaphore while using @unsync? I wrote a program that retrieves data from a API with unsync but somehow it went wrong like this:

File "C:\Users\Nature\Miniconda3\lib\asyncio\locks.py", line 92, in __aenter__
    await self.acquire()
  File "C:\Users\Nature\Miniconda3\lib\asyncio\locks.py", line 474, in acquire
    await fut
RuntimeError: Task <Task pending coro=<fetch_file() running at MyPyCode.py:84>> got Future <Future pending> attached to a different loop

After I remove asyncio.Semaphore , the program works fine but unable to restrict the coroutine number. The reason that I got to use asyncio.Semaphore is to avoid Dos Attack.

Original Code

import os
from time import perf_counter
import datetime
import asyncio
import aiohttp
import aiofiles
from unsync import unsync

BASE_URL = "https://www.ebi.ac.uk/pdbe/api/pdb/entry/"
DEMO = [
    (BASE_URL+'summary/1a01', '1a01_summary.json'),
    (BASE_URL+'summary/2xyn', '2xyn_summary.json'),
    (BASE_URL+'summary/1miu', '1miu_summary.json'),
    (BASE_URL+'summary/2hev', '2hev_summary.json'),
    (BASE_URL+'summary/3g96', '3g96_summary.json')]

@unsync
async def download_file(url):
    print(f"[{datetime.datetime.now()}] Start to get file: {url}")
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            if resp.status == 200:
                return await resp.read()
            elif resp.status == 404:
                return None
            else:
                mes = "code={resp.status}, message={resp.reason}, headers={resp.headers}"
                raise Exception(mes.format(resp=resp))


@unsync
async def save_file(path, data):
    print(f"[{datetime.datetime.now()}] Start to save file: {path}")
    async with aiofiles.open(path, 'wb') as jsonFile:
        await jsonFile.write(data)


@unsync
async def fetch_file(semaphore, url, path, rate):
    async with semaphore:
        data = await download_file(url)
        await asyncio.sleep(rate)
        if data is not None:
            await save_file(path, data)
            return path


def multi_tasks(workdir, concur_req: int, rate=1.5):
    semaphore = asyncio.Semaphore(concur_req)
    tasks = [fetch_file(semaphore, url, os.path.join(workdir, path), rate) for url, path in DEMO]
    for t in tasks:
        t.result()


if __name__ == "__main__":
    workdir = "./"
    t0 = perf_counter()
    multi_tasks(workdir, 4)
    elapsed = perf_counter() - t0
    print(f'downloaded in {elapsed}s')

Behavior (as mentioned above)

(base) PS C:\GitWorks\temp> python .\MyPyCode.py
[2020-01-16 22:08:59.948408] Start to get file: https://www.ebi.ac.uk/pdbe/api/pdb/entry/summary/1a01
[2020-01-16 22:08:59.972382] Start to get file: https://www.ebi.ac.uk/pdbe/api/pdb/entry/summary/2xyn
[2020-01-16 22:08:59.974339] Start to get file: https://www.ebi.ac.uk/pdbe/api/pdb/entry/summary/1miu
[2020-01-16 22:08:59.975377] Start to get file: https://www.ebi.ac.uk/pdbe/api/pdb/entry/summary/2hev
[2020-01-16 22:09:02.782357] Start to save file: C:\GitWorks\temp\1miu_summary.json
[2020-01-16 22:09:02.783426] Start to save file: C:\GitWorks\temp\2hev_summary.json
[2020-01-16 22:09:02.896178] Start to save file: C:\GitWorks\temp\2xyn_summary.json
[2020-01-16 22:09:02.932120] Start to save file: C:\GitWorks\temp\1a01_summary.json
Traceback (most recent call last):
  File ".\MyPyCode.py", line 60, in <module>
    multi_tasks(workdir, 4)
  File ".\MyPyCode.py", line 54, in multi_tasks
    t.result()
  File "C:\Users\Nature\Miniconda3\lib\site-packages\unsync\unsync.py", line 112, in result
    return self.future.result()
  File ".\MyPyCode.py", line 41, in fetch_file
    async with semaphore:
  File "C:\Users\Nature\Miniconda3\lib\asyncio\locks.py", line 92, in __aenter__
    await self.acquire()
  File "C:\Users\Nature\Miniconda3\lib\asyncio\locks.py", line 474, in acquire
    await fut
RuntimeError: Task <Task pending coro=<fetch_file() running at .\MyPyCode.py:41>> got Future <Future pending> attached to a different loop

Code that removes asyncio.Semaphore

Just modify the following functions and the rest of code is as same as the original code.

@unsync
async def fetch_file(url, path, rate):
    data = await download_file(url)
    await asyncio.sleep(rate)
    if data is not None:
        await save_file(path, data)
        return path

def multi_tasks(workdir, concur_req: int, rate=1.5):
    # semaphore = asyncio.Semaphore(concur_req)
    tasks = [fetch_file(url, os.path.join(
        workdir, path), rate) for url, path in DEMO]
    for t in tasks:
        t.result()

Behavior

(base) PS C:\GitWorks\temp> python .\MyPyCode.py
[2020-01-16 22:27:25.656445] Start to get file: https://www.ebi.ac.uk/pdbe/api/pdb/entry/summary/1a01
[2020-01-16 22:27:25.685366] Start to get file: https://www.ebi.ac.uk/pdbe/api/pdb/entry/summary/2xyn
[2020-01-16 22:27:25.686413] Start to get file: https://www.ebi.ac.uk/pdbe/api/pdb/entry/summary/1miu
[2020-01-16 22:27:25.688351] Start to get file: https://www.ebi.ac.uk/pdbe/api/pdb/entry/summary/2hev
[2020-01-16 22:27:25.712284] Start to get file: https://www.ebi.ac.uk/pdbe/api/pdb/entry/summary/3g96
[2020-01-16 22:27:28.868901] Start to save file: C:\GitWorks\temp\1miu_summary.json
[2020-01-16 22:27:28.918699] Start to save file: C:\GitWorks\temp\2hev_summary.json
[2020-01-16 22:27:28.921433] Start to save file: C:\GitWorks\temp\1a01_summary.json
[2020-01-16 22:27:28.923428] Start to save file: C:\GitWorks\temp\2xyn_summary.json
[2020-01-16 22:27:28.947356] Start to save file: C:\GitWorks\temp\3g96_summary.json
downloaded in 3.3005259000000002s

It is good to see that the program runs very fast, but it would raise a Dos attack with the number of coroutines grows.

Code that removes @unsync

Just modify the following functions and the rest of code is as same as the original code.

# @unsync
async def download_file(url):
    print(f"[{datetime.datetime.now()}] Start to get file: {url}")
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            if resp.status == 200:
                return await resp.read()
            elif resp.status == 404:
                return None
            else:
                mes = "code={resp.status}, message={resp.reason}, headers={resp.headers}"
                raise Exception(mes.format(resp=resp))


# @unsync
async def save_file(path, data):
    print(f"[{datetime.datetime.now()}] Start to save file: {path}")
    async with aiofiles.open(path, 'wb') as jsonFile:
        await jsonFile.write(data)


# @unsync
async def fetch_file(semaphore, url, path, rate):
    async with semaphore:
        data = await download_file(url)
        await asyncio.sleep(rate)
        if data is not None:
            await save_file(path, data)
            return path


def multi_tasks(workdir, concur_req: int, rate=1.5):
    semaphore = asyncio.Semaphore(concur_req)
    '''
    tasks = [fetch_file(semaphore, url, os.path.join(
        workdir, path), rate) for url, path in DEMO]
    for t in tasks:
        t.result()
    '''
    loop = asyncio.get_event_loop()
    tasks = asyncio.gather(*[loop.create_task(fetch_file(semaphore, url, os.path.join(workdir, path), rate))
                             for url, path in DEMO])
    loop.run_until_complete(tasks)

Behavior

(base) PS C:\GitWorks\temp> python .\MyPyCode.py
[2020-01-16 22:21:28.923504] Start to get file: https://www.ebi.ac.uk/pdbe/api/pdb/entry/summary/1a01
[2020-01-16 22:21:28.948484] Start to get file: https://www.ebi.ac.uk/pdbe/api/pdb/entry/summary/2xyn
[2020-01-16 22:21:28.949433] Start to get file: https://www.ebi.ac.uk/pdbe/api/pdb/entry/summary/1miu
[2020-01-16 22:21:28.950435] Start to get file: https://www.ebi.ac.uk/pdbe/api/pdb/entry/summary/2hev
[2020-01-16 22:21:32.499267] Start to save file: C:\GitWorks\temp\1a01_summary.json
[2020-01-16 22:21:32.499759] Start to save file: C:\GitWorks\temp\1miu_summary.json
[2020-01-16 22:21:32.507789] Start to get file: https://www.ebi.ac.uk/pdbe/api/pdb/entry/summary/3g96
[2020-01-16 22:21:32.542851] Start to save file: C:\GitWorks\temp\2xyn_summary.json
[2020-01-16 22:21:32.546636] Start to save file: C:\GitWorks\temp\2hev_summary.json
[2020-01-16 22:21:35.358457] Start to save file: C:\GitWorks\temp\3g96_summary.json
downloaded in 6.4380731s

Although it works well without unsync, I would like to apply unsync's mixing methods feature in my job which integrate threads and processes.

Environment

{
    "platform": "Win10",
    "python": "3.7.1", 
    "aiohttp": "3.6.2",
    "aiofiles": "0.4.0",
    "unsync": "1.2.1"
}

Can't Import unsync on AWS Lambda

I am trying to use the unsync module with AWS Lambda but everytime I try to import unsync my lambda execution fails with following error.

"module initialization error: [Errno 38] Function not implemented"

@unsync(cpu_bound=True) Windows 10 64 error

hello,
running this code

@unsync(cpu_bound=True)
def non_async_function(num):
    print ('non async function %i',num)
    return num,sum(i * i for i in range(num))

on windows 10 Pro version 64 bit x64 based procesor, I receive an error:

RuntimeError:
       > An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program <

Calling nested results() on futures produces InvalidStateError

Running the following code produces an InvalidStateError. I built a simpler example below based on what I am trying to do. The idea is simple, having a unsync function that creates x number of tasks and returns the results of those tasks while inside the function. Either iterating through the list of tasks and calling result() or passing them into result_processor() raises an InvalidStateError.

However, if I return the tasks instead of processing them inside caculation_tasks() and process them at the top level (either through iteration and result() or with the result_processor(tasks)) it works just fine. Returning the tasks to the top level could be problematic, and the service I am testing has a lot of nested function calls.

import time
import asyncio

from unsync import unsync

@unsync
async def async_continuation(new_num):
    """A preliminary result processor we'll chain on to the original task
       This will get executed wherever the source task was executed, in this
       case one of the threads in the ThreadPoolExecutor"""
    await asyncio.sleep(0.1)
    return new_num * 2, new_num

@unsync
async def caculation_tasks():
    tasks = []
    for x in range(1, 10):
        tasks.append(async_continuation(x))
    return result_processor(tasks).result()
    # return [x.result() for x in tasks]

@unsync
async def result_processor(tasks):
    """An async result aggregator that combines all the results
       This gets executed in unsync.loop and unsync.thread"""
    output = {}
    for task in tasks:
        num, res = await task
        output[num] = res
    return output


start = time.time()
results = caculation_tasks().result()
print(results)
print('Executed in {} seconds'.format(time.time() - start))

btw the guys at PythonBytes told me about this library and I am excited to play with it.
https://pythonbytes.fm/episodes/show/184/too-many-ways-to-wait-with-await

Ability to limit concurrency?

This looks like a fantastic tool! Very interested in it. I'm trying to see if there's a way to limit concurrency while using this tool. For example, I'd like to make hundreds or thousands of API calls, each of which takes about 10 seconds, but I'd like to limit the load on the server and have at most about 20 calls running at a time. Is that something I could do with this tool? I couldn't find anything in the tests or documentation so I'm guessing no, but thought I'd check. If no, is that a reasonable feature request or would that use case mean I should choose a different tool?

Why not use unittest's assertRaises instead of pytest.raises?

In tests:

    def test_exception(self):
        class TestException(Exception):
            pass

        @unsync
        async def error():
            await asyncio.sleep(0.1)
            raise TestException

        with raises(TestException):  # pytest.raises
            error().result()

but self.assertRaises would do the same:

        with self.assertRaises(TestException):
            error().result()

I think it would be nice to not have any 3rd-party dependencies at all, even in tests. Or maybe there is some tricky difference?

CPU bound coroutines

Hi. Love unsync. I've always felt C#'s async / await implementation was much slicker than Python's and you bring much of that to Python with barely over 100 lines of code. Well done.

I do have one issue to report. If I have the following:

@unsync(cpu_bound=True)
async def compute_some():
    print("Computing {}...".format(process_info()))
    for _ in range(1, 10_000_000):
        math.sqrt(25 ** 25 + .01)

This runs on the asyncio loop even though cpu_bound=True. The reason is async def has higher priority.

I'd suggest one of two changes.

  1. Go ahead and run this in a separate multiprocess execution and just create an asyncio loop to run it.
  2. Raise an error saying cpu_bound cannot be paired with an async coroutine and instruct them to remove cpu_bound=True or change async def to def.

I'm partial to 1, but either would be better than ignoring the cpu_bound setting.

Thanks!

unsync replaces event loop for main thread

I don't know whether it's ok or not, but import unsync changes event loop for current thread, so event loops before import and after are not the same. It potentially can cause problems, because any regular non-unsync coroutine will run on unsync event loop.

In [1]: import asyncio                                                                                                                                                                                             

In [2]: original_loop = asyncio.get_event_loop()                                                                                                                                                                   

In [3]: from unsync import unsync                                                                                                                                                                                  

In [4]: current_loop = asyncio.get_event_loop()                                                                                                                                                                    

In [5]: original_loop == current_loop                                                                                                                                                                              
Out[5]: False

In [6]: current_loop == unsync.loop                                                                                                                                                                                
Out[6]: True

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.