Giter Club home page Giter Club logo

loky's Introduction

Joblib version Azure CI status Documentation Status Codecov coverage

The homepage of joblib with user documentation is located on:

https://joblib.readthedocs.io

Getting the latest code

To get the latest code using git, simply type:

git clone https://github.com/joblib/joblib.git

If you don't have git installed, you can download a zip of the latest code: https://github.com/joblib/joblib/archive/refs/heads/main.zip

Installing

You can use pip to install joblib:

pip install joblib

from any directory or:

python setup.py install

from the source directory.

Dependencies

  • Joblib has no mandatory dependencies besides Python (supported versions are 3.8+).
  • Joblib has an optional dependency on Numpy (at least version 1.6.1) for array manipulation.
  • Joblib includes its own vendored copy of loky for process management.
  • Joblib can efficiently dump and load numpy arrays but does not require numpy to be installed.
  • Joblib has an optional dependency on python-lz4 as a faster alternative to zlib and gzip for compressed serialization.
  • Joblib has an optional dependency on psutil to mitigate memory leaks in parallel worker processes.
  • Some examples require external dependencies such as pandas. See the instructions in the Building the docs section for details.

Workflow to contribute

To contribute to joblib, first create an account on github. Once this is done, fork the joblib repository to have your own repository, clone it using 'git clone' on the computers where you want to work. Make your changes in your clone, push them to your github account, test them on several computers, and when you are happy with them, send a pull request to the main repository.

Running the test suite

To run the test suite, you need the pytest (version >= 3) and coverage modules. Run the test suite using:

pytest joblib

from the root of the project.

Building the docs

To build the docs you need to have sphinx (>=1.4) and some dependencies installed:

pip install -U -r .readthedocs-requirements.txt

The docs can then be built with the following command:

make doc

The html docs are located in the doc/_build/html directory.

Making a source tarball

To create a source tarball, eg for packaging or distributing, run the following command:

python setup.py sdist

The tarball will be created in the dist directory. This command will compile the docs, and the resulting tarball can be installed with no extra dependencies than the Python standard library. You will need setuptool and sphinx.

Making a release and uploading it to PyPI

This command is only run by project manager, to make a release, and upload in to PyPI:

python setup.py sdist bdist_wheel
twine upload dist/*

Note that the documentation should automatically get updated at each git push. If that is not the case, try building th doc locally and resolve any doc build error (in particular when running the examples).

Updating the changelog

Changes are listed in the CHANGES.rst file. They must be manually updated but, the following git command may be used to generate the lines:

git log --abbrev-commit --date=short --no-merges --sparse

loky's People

Contributors

aabadie avatar albertcthomas avatar basnijholt avatar chkoar avatar cj-wright avatar glemaitre avatar haim0n avatar jeremiedbb avatar lukasz-migas avatar massich avatar mgorny avatar moinnadeem avatar ogrisel avatar pierreglaser avatar pmav99 avatar rth avatar schlerp avatar tommoral avatar uniontech-lilinjie avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

loky's Issues

Random deadlock in test_in_callback_submit_with_crash[func15-args15-UnpicklingError] at executor shutdown during test teardown

Observed on Python 2.7 (32 bit) under Windows (on appveyor):

tests/test_reusable_executor.py::TestExecutorDeadLock::test_in_callback_submit_with_crash[func15-args15-UnpicklingError] PASSED
tests/test_reusable_executor.py::TestExecutorDeadLock::test_callback_crash_on_submit PASSED
tests/test_reusable_executor.py::TestExecutorDeadLock::test_deadlock_kill PASSED
tests/test_reusable_executor.py::TestExecutorDeadLock::test_crash_races[1] PASSED
tests/test_reusable_executor.py::TestExecutorDeadLock::test_crash_races[2] PASSED
+++++++++++++++++++++++++++++++++++ Timeout ++++++++++++++++++++++++++++++++++++
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Captured stderr ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[DEBUG:MainProcess:QueueManager] queue management thread shutting down
[DEBUG:MainProcess:QueueManager] closing call_queue
[DEBUG:MainProcess:QueueManager] telling queue thread to quit
[DEBUG:MainProcess:QueueManager] joining processes
[DEBUG:MainProcess:MainThread] Creating a new executor with max_workers=2 as the previous instance cannot be reused (broken).
[DEBUG:MainProcess:QueueFeederThread] feeder thread got sentinel -- exiting
[DEBUG:MainProcess:QueueManager] queue management thread clean shutdown of worker processes: []
[DEBUG:MainProcess:MainThread] shutting down executor <loky.reusable_executor.ReusablePoolExecutor object at 0x03E377B0>
[DEBUG:MainProcess:ThreadManager] shutting down
~~~~~~~~~~~~~~~~~~~~~~~~~~ Stack of Thread-29 (2760) ~~~~~~~~~~~~~~~~~~~~~~~~~~~
  File "C:\Python27\Lib\threading.py", line 774, in __bootstrap
    self.__bootstrap_inner()
  File "C:\Python27\Lib\threading.py", line 801, in __bootstrap_inner
    self.run()
  File "C:\Python27\Lib\threading.py", line 754, in run
    self.__target(*self.__args, **self.__kwargs)
  File "C:\Python27\Lib\multiprocessing\reduction.py", line 124, in _serve
    conn = _listener.accept()
  File "C:\Python27\Lib\multiprocessing\connection.py", line 145, in accept
    c = self._listener.accept()
  File "C:\Python27\Lib\multiprocessing\connection.py", line 365, in accept
    win32.ConnectNamedPipe(handle, win32.NULL)
~~~~~~~~~~~~~~~~~~~~~~~~~~ Stack of MainThread (2136) ~~~~~~~~~~~~~~~~~~~~~~~~~~
  File "C:\Python27\Lib\runpy.py", line 174, in _run_module_as_main
    "__main__", fname, loader, pkg_name)
  File "C:\Python27\Lib\runpy.py", line 72, in _run_code
    exec code in run_globals
  File "C:\projects\loky\.tox\py27\Scripts\py.test.EXE\__main__.py", line 9, in <module>
    sys.exit(main())
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\config.py", line 58, in main
    return config.hook.pytest_cmdline_main(config=config)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 745, in __call__
    return self._hookexec(self, self._nonwrappers + self._wrappers, kwargs)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 339, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 334, in <lambda>
    _MultiCall(methods, kwargs, hook.spec_opts).execute()
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 614, in execute
    res = hook_impl.function(*args)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\main.py", line 139, in pytest_cmdline_main
    return wrap_session(config, _main)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\main.py", line 110, in wrap_session
    session.exitstatus = doit(config, session) or 0
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\main.py", line 146, in _main
    config.hook.pytest_runtestloop(session=session)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 745, in __call__
    return self._hookexec(self, self._nonwrappers + self._wrappers, kwargs)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 339, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 334, in <lambda>
    _MultiCall(methods, kwargs, hook.spec_opts).execute()
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 614, in execute
    res = hook_impl.function(*args)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\main.py", line 169, in pytest_runtestloop
    item.config.hook.pytest_runtest_protocol(item=item, nextitem=nextitem)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 745, in __call__
    return self._hookexec(self, self._nonwrappers + self._wrappers, kwargs)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 339, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 334, in <lambda>
    _MultiCall(methods, kwargs, hook.spec_opts).execute()
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 613, in execute
    return _wrapped_call(hook_impl.function(*args), self.execute)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 248, in _wrapped_call
    call_outcome = _CallOutcome(func)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 265, in __init__
    self.result = func()
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 613, in execute
    return _wrapped_call(hook_impl.function(*args), self.execute)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 248, in _wrapped_call
    call_outcome = _CallOutcome(func)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 265, in __init__
    self.result = func()
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 613, in execute
    return _wrapped_call(hook_impl.function(*args), self.execute)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 248, in _wrapped_call
    call_outcome = _CallOutcome(func)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 265, in __init__
    self.result = func()
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 614, in execute
    res = hook_impl.function(*args)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\runner.py", line 67, in pytest_runtest_protocol
    runtestprotocol(item, nextitem=nextitem)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\runner.py", line 83, in runtestprotocol
    nextitem=nextitem))
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\runner.py", line 157, in call_and_report
    call = call_runtest_hook(item, when, **kwds)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\runner.py", line 177, in call_runtest_hook
    return CallInfo(lambda: ihook(item=item, **kwds), when=when)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\runner.py", line 191, in __init__
    self.result = func()
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\runner.py", line 177, in <lambda>
    return CallInfo(lambda: ihook(item=item, **kwds), when=when)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 745, in __call__
    return self._hookexec(self, self._nonwrappers + self._wrappers, kwargs)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 339, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 334, in <lambda>
    _MultiCall(methods, kwargs, hook.spec_opts).execute()
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 613, in execute
    return _wrapped_call(hook_impl.function(*args), self.execute)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 248, in _wrapped_call
    call_outcome = _CallOutcome(func)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 265, in __init__
    self.result = func()
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 614, in execute
    res = hook_impl.function(*args)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\runner.py", line 125, in pytest_runtest_teardown
    item.session._setupstate.teardown_exact(item, nextitem)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\runner.py", line 474, in teardown_exact
    self._teardown_towards(needed_collectors)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\runner.py", line 480, in _teardown_towards
    self._pop_and_teardown()
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\runner.py", line 440, in _pop_and_teardown
    self._teardown_with_finalization(colitem)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\runner.py", line 458, in _teardown_with_finalization
    self._callfinalizers(colitem)
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\runner.py", line 448, in _callfinalizers
    fin()
  File "c:\projects\loky\.tox\py27\lib\site-packages\_pytest\python.py", line 465, in <lambda>
    return lambda: result(param_obj)
  File "C:\projects\loky\tests\_executor_mixin.py", line 181, in teardown_method
    executor = get_reusable_executor(max_workers=2)
  File "C:\projects\loky\loky\reusable_executor.py", line 106, in get_reusable_executor
    executor.shutdown(wait=True, kill_workers=kill_workers)
  File "C:\projects\loky\loky\process_executor.py", line 978, in shutdown
    self._queue_management_thread.join()
  File "C:\Python27\Lib\threading.py", line 940, in join
    self.__block.wait()
  File "C:\Python27\Lib\threading.py", line 340, in wait
    waiter.acquire()

Random test failure on travis

=================================== FAILURES ===================================

________________________ TestLokyBackend.test_terminate ________________________

self = <tests.test_loky_backend.TestLokyBackend instance at 0x7fcb4baf59e0>

    def test_terminate(self):

    

        p = self.Process(target=self._test_terminate)

        p.daemon = True

        p.start()

    

        assert p.is_alive()

        assert p in self.active_children()

        assert p.exitcode is None

    

        join = TimingWrapper(p.join)

    

        assert join(0) is None

        self.assertTimingAlmostEqual(join.elapsed, 0.0)

        assert p.is_alive()

    

        assert join(-1) is None

        self.assertTimingAlmostEqual(join.elapsed, 0.0)

        assert p.is_alive()

    

        # XXX maybe terminating too soon causes the problems on Gentoo...

        time.sleep(1)

    

        p.terminate()

    

        if hasattr(signal, 'alarm'):

            # On the Gentoo buildbot waitpid() often seems to block forever.

            # We use alarm() to interrupt it if it blocks for too long.

            def handler(*args):

                raise RuntimeError('join took too long: %s' % p)

            old_handler = signal.signal(signal.SIGALRM, handler)

            try:

                signal.alarm(10)

                assert join() is None

            finally:

                signal.alarm(0)

                signal.signal(signal.SIGALRM, old_handler)

        else:

            assert join() is None

    

>       self.assertTimingAlmostEqual(join.elapsed, 0.0)

tests/test_loky_backend.py:148: 

_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

t = 3.901387929916382, g = 0.0

    @staticmethod

    def assertTimingAlmostEqual(t, g):

>       assert round(t-g, 1) == 0

E       assert 3.9 == 0

E        +  where 3.9 = round((3.901387929916382 - 0.0), 1)

tests/test_loky_backend.py:233: AssertionError

Support frozen executable on Windows

If using PyInstaller to create an executable, the recommended way for multiprocessing is to use freeze_support:

from multiprocessing import Process, freeze_support

def f():
    print 'hello world!'

if __name__ == '__main__':
    freeze_support()
    Process(target=f).start()

If we do not use this, the UI will repeat itself (like fork) but if I do use that on Windows I get an error:

Traceback (most recent call last):
File "gui\app.py", line 590, in
File "multiprocessing_init_.py", line 145, in freeze_support
File "multiprocessing\forking.py", line 336, in freeze_support
File "multiprocessing\forking.py", line 326, in is_forking
AssertionError

then

BrokenProcessPool: A process in the executor was terminated abruptly while the future was running or pending.

2018-04-27 16:54:24,895.895 [Dummy-1 ] util.py:78 info(): process shutting down

also, this is the line that raises the assertion:

        if len(argv) >= 2 and argv[1] == '--multiprocessing-fork':
            assert len(argv) == 3
            return True

This is the passed argv (1 at the end is missing):

['D:\development\app\dist\app\app.exe', '--multiprocessing-fork']

I am wondering if argv is different when using loky than multiprocessing, and what is the equivalent/workaround for loky

Leaking semaphores on AWS Lambda

When using loky on AWS Lambda, I get error messages about leaked semaphores:

loky/backend/semaphore_tracker.py:158: UserWarning: semaphore_tracker: There appear to be 6 leaked semaphores to clean up at shutdown
len(cache))
loky/backend/semaphore_tracker.py:174: UserWarning: semaphore_tracker: '/loky-1-Au1xgY': [Errno 38] ENOSYS
warnings.warn('semaphore_tracker: %r: %s' % (name, e))
loky/backend/semaphore_tracker.py:174: UserWarning: semaphore_tracker: '/loky-1-Gzh15f': [Errno 38] ENOSYS
warnings.warn('semaphore_tracker: %r: %s' % (name, e))
loky/backend/semaphore_tracker.py:174: UserWarning: semaphore_tracker: '/loky-1-Tz7Xrd': [Errno 38] ENOSYS
warnings.warn('semaphore_tracker: %r: %s' % (name, e))
loky/backend/semaphore_tracker.py:174: UserWarning: semaphore_tracker: '/loky-1-zwUVYi': [Errno 38] ENOSYS
warnings.warn('semaphore_tracker: %r: %s' % (name, e))
loky/backend/semaphore_tracker.py:174: UserWarning: semaphore_tracker: '/loky-1-uODqzO': [Errno 38] ENOSYS
warnings.warn('semaphore_tracker: %r: %s' % (name, e))
loky/backend/semaphore_tracker.py:174: UserWarning: semaphore_tracker: '/loky-1-TfXo55': [Errno 38] ENOSYS
warnings.warn('semaphore_tracker: %r: %s' % (name, e))

It's probably that this is not supported on AWS Lambda, just as multiprocessing fails there. But even if it's not fixable, perhaps it would be possible to detect the incompatible platform, already in get_reusable_executor, or executor.map and give a good error message?

Ensure that loky exceptions inherit from their concurrent.futures counterparts

Exceptions such as CanceledError and TimeoutError should be:

  • exposed at the top level namespace package (to be part of the public API)
  • derive from their concurrent.futures counterpart to make it possible to try / except with the exceptions from concurrent.futures to make the loky executor a drop in replacement.

Running tests; contributing instructions

I was wondering what is the expected way to run tests. I cannot find any documentation about it.

After installing loky with pip install -e ., running

pytest .

results in 3 test failures in tests/test_loky_backend.py (on Linux, Python 3.6), for instance

______________________________________________ TestLokyBackend.test_interactively_define_process_no_main[True] ______________________________________________

self = <tests.test_loky_backend.TestLokyBackend object at 0x7efc6f0aa2e8>, run_file = True
[...]
E               ValueError: Non-zero return code: 1.
E               Stdout:
E               
E               Stderr:
E               Traceback (most recent call last):
E                 File "/tmp/tmp5c1ijwmn_joblib.py", line 1, in <module>
E                   from loky.backend.process import LokyProcess
E               ImportError: No module named 'loky'

tests/utils.py:111: ValueError

the remaining 212 tests pass.

I can see that Travis CI uses tox that does call pytest but shouldn't this also work without tox?

Port robust ProcessPoolExecutor to `concurrent.futures`

Identify and port robustification to python upstream concurrent.futures.ProcessPoolExecutor.
See this repo.


Robustifications

  • Create an issue on the python tracker. The PR should contain multiple standalone commits to allow easy discussion with the developpers, and permit cherry picking what make sense for upstream and what to discard as unnecessary changes.

    • Add context concept for concurrent.futures.ProcessPoolExecutor to allow using forkserver and spawn as starting methods for the workers. tomMoral/cpython@185c35d
    • Port the wakeup _Sentinel : this avoids deadlocks if a worker dies with the lock on result_queue. tomMoral/cpython@51102e7
    • Port thread_manager : permits to detect the failure of the queue_manager_thread. tomMoral/cpython@2e0f811
    • Port robust shutdown : enhance the shutdown process to make sure the correct flags are used. tomMoral/cpython@47c39f1
    • Add worker timeout/max_tasks ?

Tests

  • Add crash test to tests/test_concurrent_futures.py to validate our code in each commits

Random "IOError: [Errno 9] Bad file descriptor" in CreatePipe under Windows

______________ TestsProcessPoolLokyExecutor.test_worker_timeout _______________
self = <tests.test_process_executor_loky.TestsProcessPoolLokyExecutor instance at 0x02DCE3C8>
    @pytest.mark.timeout(50 if sys.platform == "win32" else 25)
    def test_worker_timeout(self):
        self.executor.shutdown(wait=True)
        self.check_no_running_workers(patience=5)
        timeout = getattr(self, 'min_worker_timeout', .01)
        try:
            self.executor = self.executor_type(
                max_workers=4, context=self.context, timeout=timeout)
        except NotImplementedError as e:
            self.skipTest(str(e))
    
        for i in range(5):
            # Trigger worker spawn for lazy executor implementations
>           for result in self.executor.map(id, range(8)):
i          = 4
result     = 46628128
self       = <tests.test_process_executor_loky.TestsProcessPoolLokyExecutor instance at 0x02DCE3C8>
timeout    = 0.01
tests\_test_process_executor.py:626: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
loky\process_executor.py:942: in map
    timeout=timeout)
loky\_base.py:576: in map
    fs = [self.submit(fn, *args) for args in zip(*iterables)]
loky\process_executor.py:909: in submit
    self._ensure_executor_running()
loky\process_executor.py:887: in _ensure_executor_running
    self._adjust_process_count()
loky\process_executor.py:878: in _adjust_process_count
    p.start()
loky\backend\process.py:49: in start
    self._popen = self._Popen(self)
loky\backend\process.py:37: in _Popen
    return Popen(process_obj)
loky\backend\popen_loky_win32.py:43: in __init__
    rhandle, wfd = _winapi.CreatePipe(None, 0)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
args = (None, 0), rfd = -1, wfd = 8
_current_process = <_subprocess_handle object at 0x02DB0C80>
    @staticmethod
    def CreatePipe(*args):
        rfd, wfd = os.pipe()
        _current_process = win_api.GetCurrentProcess()
        rhandle = win_api.DuplicateHandle(
>           _current_process, msvcrt.get_osfhandle(rfd),
            _current_process, 0, True,
            win_api.DUPLICATE_SAME_ACCESS)
E       IOError: [Errno 9] Bad file descriptor
_current_process = <_subprocess_handle object at 0x02DB0C80>
args       = (None, 0)
rfd        = -1
wfd        = 8

Code cleanup

  • rename pool to executor whenever it makes sense
  • reorganize source folder to emphasize what is backported code and what is specific to loky
  • fix pyflakes error
  • import public API in top level package namespace

loky.backend.semaphore_tracker.sem_unlink does not have same signature if coming from ctypes or _multiprocessing

  • _multi_processing.sem_unlink takes str
  • loky.backend.semlock.sem_unlink comes from ctypes and take bytes.

It feels like some code was written with the ctypes variant in mind and raise an error when the _multiprocessing.sem_unlink is called. Tests seem to be only testing loky.backend.semlock.sem_unlink.

Context

This is an error I just saw in a joblib Travis build. Note this is with loky version 1.2.1.

E               /home/travis/build/joblib/joblib/joblib/externals/loky/backend/semaphore_tracker.py:195: UserWarning: semaphore_tracker: There appear to be 6 leaked semaphores to clean up at shutdown
E                 len(cache))
E               /home/travis/build/joblib/joblib/joblib/externals/loky/backend/semaphore_tracker.py:211: UserWarning: semaphore_tracker: b'/loky-5456-6haleho6': TypeError('argument 1 must be str, not bytes',)
E                 warnings.warn('semaphore_tracker: %r: %r' % (name, e)) 

Quickly looking at it, it seems like this is still in master. The code where the warning happens is here:
https://github.com/tomMoral/loky/blob/dec1c8144b12938dfe7bfc511009e12f25fd1cd9/loky/backend/semaphore_tracker.py#L203-L211

Random deadlock in current master ()

Current thread 0x00007fff7b73f000 (most recent call first):
  File "/Users/ogrisel/code/Rpool/backend/reusable_pool.py", line 312 in _empty_queue
  File "/Users/ogrisel/code/Rpool/backend/reusable_pool.py", line 298 in _help_stuff_finish
  File "/Users/ogrisel/code/Rpool/backend/reusable_pool.py", line 249 in _terminate_pool
  File "/usr/local/Cellar/python/2.7.8_2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/util.py", line 207 in __call__
  File "/usr/local/Cellar/python/2.7.8_2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 456 in terminate
  File "/Users/ogrisel/code/Rpool/backend/reusable_pool.py", line 118 in terminate
...

No other thread is running.

Deadlock on __exit__ with BrokenProcessPool

This code causes a deadlock:

from loky import ProcessPoolExecutor
  

class PickleError():
    def __reduce__(self):
        raise RuntimeError()


if __name__ == "__main__":
    with ProcessPoolExecutor() as e:
        e.map(id, [PickleError()])

This results from a race condition, where the executor is shutting down before it gets flags as broken. This is somewhat linked to #71

Random failure in travis with `fork` processes

There is a random failure in travis with forked processes.

Here, one of the forked process (ForkProcess-208) never started. I think we should drop the support of fork backend as it is not robust by design. It would quicken the test and also avoid confusing the users on why this library is developed.

Other examples of the same bug: here with ForkProcess-206 in the first failure.

Exit -11 (segfault) in check_subprocess in test_interactively_define_executor_no_main

This job failed with a segfault on MacOs with python2.7.

=================================== FAILURES ===================================
__________________ test_interactively_define_executor_no_main __________________
    def test_interactively_define_executor_no_main():
        # check that the init_main_module parameter works properly
        # when using -c option, we don't need the safeguard if __name__ ..
        # and thus test LokyProcess without the extra argument. For running
        # a script, it is necessary to use init_main_module=False.
        code = """if True:
            from loky import get_reusable_executor
            e = get_reusable_executor()
            e.submit(id, 42).result()
            print("ok")
        """
        cmd = [sys.executable]
        try:
            fid, filename = mkstemp(suffix="_joblib.py")
            os.close(fid)
            with open(filename, mode='wb') as f:
                f.write(code.encode('ascii'))
            cmd += [filename]
>           check_subprocess_call(cmd, stdout_regex=r'ok', timeout=10)
cmd        = ['/Users/travis/build/tomMoral/loky/.tox/py27/bin/python2.7', '/var/folders/my/m6ynh3bn6tq06h7xr3js0z7r0000gn/T/tmpP_HFC1_joblib.py']
code       = 'if True:\n        from loky import get_reusable_executor\n        e = get_reusable_executor()\n        e.submit(id, 42).result()\n        print("ok")\n    '
f          = <closed file '/var/folders/my/m6ynh3bn6tq06h7xr3js0z7r0000gn/T/tmpP_HFC1_joblib.py', mode 'wb' at 0x10ff52030>
fid        = 61
filename   = '/var/folders/my/m6ynh3bn6tq06h7xr3js0z7r0000gn/T/tmpP_HFC1_joblib.py'
tests/test_reusable_executor.py:551: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
cmd = ['/Users/travis/build/tomMoral/loky/.tox/py27/bin/python2.7', '/var/folders/my/m6ynh3bn6tq06h7xr3js0z7r0000gn/T/tmpP_HFC1_joblib.py']
timeout = 10, stdout_regex = 'ok', stderr_regex = None
    def check_subprocess_call(cmd, timeout=1, stdout_regex=None,
                              stderr_regex=None):
        """Runs a command in a subprocess with timeout in seconds.
    
        Also checks returncode is zero, stdout if stdout_regex is set, and
        stderr if stderr_regex is set.
        """
        proc = subprocess.Popen(cmd, stdout=subprocess.PIPE,
                                stderr=subprocess.PIPE)
    
        def kill_process():
            warnings.warn("Timeout running {}".format(cmd))
            proc.kill()
    
        timer = threading.Timer(timeout, kill_process)
        try:
            timer.start()
            stdout, stderr = proc.communicate()
    
            if sys.version_info[0] >= 3:
                stdout, stderr = stdout.decode(), stderr.decode()
            if proc.returncode == -9:
                message = (
                    'Subprocess timeout after {}s.\nStdout:\n{}\n'
                    'Stderr:\n{}').format(timeout, stdout, stderr)
                raise TimeoutError(message)
            elif proc.returncode != 0:
                message = (
                    'Non-zero return code: {}.\nStdout:\n{}\n'
                    'Stderr:\n{}').format(
                        proc.returncode, stdout, stderr)
>               raise ValueError(message)
E               ValueError: Non-zero return code: -11.
E               Stdout:
E               
E               Stderr:
E               loky/backend/semlock.py:217: RuntimeWarning: semaphore are broken on OSX, release might increase its maximal value
E                 "increase its maximal value", RuntimeWarning)
E               loky/backend/semlock.py:217: RuntimeWarning: semaphore are broken on OSX, release might increase its maximal value
E                 "increase its maximal value", RuntimeWarning)
cmd        = ['/Users/travis/build/tomMoral/loky/.tox/py27/bin/python2.7', '/var/folders/my/m6ynh3bn6tq06h7xr3js0z7r0000gn/T/tmpP_HFC1_joblib.py']
kill_process = <function kill_process at 0x10ff61f50>
message    = 'Non-zero return code: -11.\nStdout:\n\nStderr:\nloky/backend/semlock.py:217: RuntimeWarning: semaphore are broken on ...maphore are broken on OSX, release might increase its maximal value\n  "increase its maximal value", RuntimeWarning)\n'
proc       = <subprocess.Popen object at 0x10fea1b90>
stderr     = 'loky/backend/semlock.py:217: RuntimeWarning: semaphore are broken on OSX, release might increase its maximal value\n ...maphore are broken on OSX, release might increase its maximal value\n  "increase its maximal value", RuntimeWarning)\n'
stderr_regex = None
stdout     = ''
stdout_regex = 'ok'
timeout    = 10
timer      = <_Timer(Thread-19, stopped 123145319129088)>
tests/utils.py:111: ValueError
----------------------------- Captured stderr call -----------------------------
[DEBUG:LokyProcess-413:MainThread] recreated blocker with handle 51 and name "/loky-2418-rdWIkd"
[DEBUG:LokyProcess-413:MainThread] recreated blocker with handle 52 and name "/loky-2418-8UDY1l"
[DEBUG:LokyProcess-413:MainThread] recreated blocker with handle 53 and name "/loky-2418-uUnsqk"
[DEBUG:LokyProcess-413:MainThread] Queue._after_fork()
[DEBUG:LokyProcess-413:MainThread] recreated blocker with handle 56 and name "/loky-2418-ntIpgw"
[DEBUG:LokyProcess-413:MainThread] recreated blocker with handle 57 and name "/loky-2418-yZLFxo"
[DEBUG:LokyProcess-413:MainThread] recreated blocker with handle 48 and name "/loky-2418-O9Bx1q"
[INFO:LokyProcess-413:MainThread] child process calling self.run()
[DEBUG:LokyProcess-413:MainThread] worker started with timeout=10

Unstable TestLokyBackend::test_process on travis

https://travis-ci.org/tomMoral/loky/jobs/230024584

Unfortunately there is no informative output in the travis log:

tests/test_loky_backend.py::TestLokyBackend::test_process ERROR: InvocationError: '/home/travis/build/tomMoral/loky/.tox/py33/bin/py.test -vl --timeout=15 --maxfail=5'

___________________________________ summary ____________________________________
ERROR:   py33: commands failed

it could be the case that travis has just killed the main python process.

Note that this test was successful when travis was previously run on the PR so I suspect that this is a random failure.

Fix test openMP on appveyor

The cython compilation does not use openMP to compile the parallel loop as it is not installed on appveyor. This is out of scope of #33 but should be fixed at some point.

Random failure in test_worker_timeout ([DEBUG:MainProcess:QueueManager] The executor is broken as at least one process terminated abruptly)

First observed on Windows with 64 bit Python 2.7 in #87:

[DEBUG/LokyProcess-210] recreated blocker with handle 16
[DEBUG/LokyProcess-210] recreated blocker with handle 20
[DEBUG/LokyProcess-210] Queue._after_fork()
[DEBUG/LokyProcess-210] recreated blocker with handle 36
[DEBUG/LokyProcess-210] recreated blocker with handle 40
[INFO/LokyProcess-210] child process calling self.run()
[DEBUG/LokyProcess-210] worker started with timeout=0.01
[INFO/LokyProcess-210] shutting down worker after timeout 0.010s
[INFO/LokyProcess-210] process shutting down
[DEBUG/LokyProcess-210] running all "atexit" finalizers with priority >= 0
[DEBUG/LokyProcess-210] running the remaining "atexit" finalizers
[INFO/LokyProcess-210] process exiting with exitcode 0
[DEBUG:MainProcess:MainThread] Adjust process count : {1360: <LokyProcess(LokyProcess-213, started)>, 2580: <LokyProcess(LokyProcess-212, started)>, 2224: <LokyProcess(LokyProcess-210, stopped)>, 2172: <LokyProcess(LokyProcess-211, started)>}
[DEBUG:MainProcess:QueueManager] The executor is broken as at least one process terminated abruptly
-------------------------- Captured stderr teardown ---------------------------
[DEBUG/LokyProcess-211] Using default backend pickle for pickling.
[DEBUG/LokyProcess-211] recreated blocker with handle 16
[DEBUG/LokyProcess-211] recreated blocker with handle 20
[DEBUG/LokyProcess-211] Queue._after_fork()
[DEBUG/LokyProcess-211] recreated blocker with handle 44
[DEBUG/LokyProcess-211] recreated blocker with handle 52
[INFO/LokyProcess-211] child process calling self.run()
[DEBUG/LokyProcess-211] worker started with timeout=0.01
[DEBUG:MainProcess:QueueManager] terminate process LokyProcess-213
[DEBUG:MainProcess:QueueManager] terminate process LokyProcess-212
[DEBUG:MainProcess:QueueManager] terminate process LokyProcess-210
[DEBUG:MainProcess:QueueManager] terminate process LokyProcess-211
[DEBUG:MainProcess:QueueManager] queue management thread shutting down
[DEBUG:MainProcess:QueueManager] closing call_queue
[DEBUG:MainProcess:QueueManager] telling queue thread to quit
[DEBUG:MainProcess:QueueManager] joining processes
[DEBUG:MainProcess:QueueFeederThread] feeder thread got sentinel -- exiting
[DEBUG:MainProcess:QueueManager] queue management thread clean shutdown of worker processes: {}
================================== FAILURES ===================================
______________ TestsProcessPoolLokyExecutor.test_worker_timeout _______________
self = <tests.test_process_executor_loky.TestsProcessPoolLokyExecutor instance at 0x03569C38>
    @pytest.mark.timeout(50 if sys.platform == "win32" else 25)
    def test_worker_timeout(self):
        self.executor.shutdown(wait=True)
        self.check_no_running_workers(patience=5)
        timeout = getattr(self, 'min_worker_timeout', .01)
        try:
            self.executor = self.executor_type(
                max_workers=4, context=self.context, timeout=timeout)
        except NotImplementedError as e:
            self.skipTest(str(e))
    
        for i in range(5):
            # Trigger worker spawn for lazy executor implementations
>           for result in self.executor.map(id, range(8)):
i          = 2
result     = 44988960
self       = <tests.test_process_executor_loky.TestsProcessPoolLokyExecutor instance at 0x03569C38>
timeout    = 0.01
tests\_test_process_executor.py:638: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
loky\_base.py:584: in result_iterator
    yield future.result()
loky\_base.py:431: in result
    return self.__get_result()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Future at 0x3573370 state=finished raised BrokenExecutor>
    def __get_result(self):
        if self._exception:
>           raise self._exception
E           BrokenExecutor: A process in the process pool was terminated abruptly while the future was running or pending.
self       = <Future at 0x3573370 state=finished raised BrokenExecutor>
loky\_base.py:382: BrokenExecutor

It seems we have a race condition in the QueueManager: it might detect process 211 as started but dead before it actually starts (the starting logs of 211 showup in the teardown phase of the test, after the queue manager has decided that one process was dead).

#88 has been fixed in the mean time but this should be a different, Python 3 only problem.

test_interpreter_shutdown randomly fails with executor._flags.broken

https://travis-ci.org/tomMoral/loky/jobs/251070720#L414

_ ERROR at teardown of TestsProcessPoolSpawnShutdown.test_interpreter_shutdown _
self = <tests.test_process_executor_spawn.TestsProcessPoolSpawnShutdown object at 0x7ff969162c88>
method = <bound method TestsProcessPoolSpawnShutdown.test_interpreter_shutdown of <tests.test_process_executor_spawn.TestsProcessPoolSpawnShutdown object at 0x7ff969162c88>>

    def teardown_method(self, method):
        # Make sure is not broken if it should not be
        executor = getattr(self, 'executor', None)
        if executor is not None:
            assert hasattr(method, 'broken_pool') != (
>               not self.executor._flags.broken)
E           AssertionError

executor   = <loky.process_executor.ProcessPoolExecutor object at 0x7ff9691544a8>
method     = <bound method TestsProcessPoolSpawnShutdown.test_interpreter_shutdown of <tests.test_process_executor_spawn.TestsProcessPoolSpawnShutdown object at 0x7ff969162c88>>
self       = <tests.test_process_executor_spawn.TestsProcessPoolSpawnShutdown object at 0x7ff969162c88>

Race condition on OSX with fork and very short worker timeout

The test test_worker_timeout fail on OSX with python3.5 and fork when the timeout is too low (typically .01s).
There seem to be some race conditions that make the worker not start properly has no log occurs for those failed worker and this makes the test fail.

Callback error are not catched

The callback mechanism only call the function in the result_handler thread.
It doesn't catch the error.

The current pool detects the crash of the result_handler and thus flags all the remaining jobs with AbortedWorkerError with a message about the result_handler crashing.

This doesn't cause any deadlock but it can be unclear for the user.

  • Shall we create a different Error for the crashes in result_handler?
  • Shall we catch the error and do a better recovery process? This what is done in 622d7f7 by surcharging the result types.

Unsafe list comprehension in QueueManager thread under Python 3

As seen in a randomly failing cron-test, seemlingly causing a failure in test_worker_timeout.

[DEBUG/ForkServerProcess-221] running all "atexit" finalizers with priority >= 0

[DEBUG/ForkServerProcess-221] running the remaining "atexit" finalizers

[INFO/ForkServerProcess-221] process exiting with exitcode 0

[DEBUG:MainProcess:ThreadManager] All workers timed out. Adjusting process count.

[DEBUG:MainProcess:MainThread] Adjust process count : {2939: <ForkServerProcess(ForkServerProcess-224, started)>, 2941: <ForkServerProcess(ForkServerProcess-226, started)>, 2942: <ForkServerProcess(ForkServerProcess-227, started)>, 2943: <ForkServerProcess(ForkServerProcess-228, started)>}

Exception in thread QueueManager:

Traceback (most recent call last):

  File "/opt/python/3.4.6/lib/python3.4/threading.py", line 911, in _bootstrap_inner

    self.run()

  File "/opt/python/3.4.6/lib/python3.4/threading.py", line 859, in run

    self._target(*self._args, **self._kwargs)

  File "/home/travis/build/tomMoral/loky/loky/process_executor.py", line 496, in _queue_management_worker

    worker_sentinels = [p.sentinel for p in processes.values()]

  File "/home/travis/build/tomMoral/loky/loky/process_executor.py", line 496, in <listcomp>

    worker_sentinels = [p.sentinel for p in processes.values()]

RuntimeError: dictionary changed size during iteration

Random failure in test_shutdown_with_sys_exit_at_pickle

tests/test_process_executor_loky.py::TestsProcessPoolLokyShutdown::test_shutdown_with_sys_exit_at_pickle <- tests\_test_process_executor.py 
+++++++++++++++++++++++++++++++++++ Timeout ++++++++++++++++++++++++++++++++++++
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Captured stderr ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
[DEBUG:MainProcess:MainThread] shutting down executor <loky.process_executor.ProcessPoolExecutor object at 0x043D0690>
[DEBUG/LokyProcess-33] Using default backend pickle for pickling.
[DEBUG/LokyProcess-33] recreated blocker with handle 20
[DEBUG/LokyProcess-33] recreated blocker with handle 24
[DEBUG/LokyProcess-33] Queue._after_fork()
[DEBUG/LokyProcess-33] recreated blocker with handle 28
[DEBUG/LokyProcess-33] recreated blocker with handle 32
[INFO/LokyProcess-33] child process calling self.run()
[DEBUG/LokyProcess-33] worker started with timeout=None
[DEBUG:MainProcess:QueueManager] queue management thread shutting down
[DEBUG:MainProcess:QueueManager] closing call_queue
[DEBUG:MainProcess:QueueManager] telling queue thread to quit
[DEBUG:MainProcess:QueueManager] joining processes
[DEBUG:MainProcess:QueueFeederThread] feeder thread got sentinel -- exiting
[INFO/LokyProcess-32] shutting down worker on sentinel
[INFO/LokyProcess-32] process shutting down
[DEBUG/LokyProcess-32] running all "atexit" finalizers with priority >= 0
[DEBUG/LokyProcess-32] running the remaining "atexit" finalizers
[INFO/LokyProcess-32] process exiting with exitcode 0
[INFO/LokyProcess-35] shutting down worker on sentinel
[INFO/LokyProcess-35] process shutting down
[DEBUG/LokyProcess-35] running all "atexit" finalizers with priority >= 0
[DEBUG/LokyProcess-35] running the remaining "atexit" finalizers
[INFO/LokyProcess-35] process exiting with exitcode 0
[INFO/LokyProcess-33] shutting down worker on sentinel
[INFO/LokyProcess-33] process shutting down
[DEBUG/LokyProcess-33] running all "atexit" finalizers with priority >= 0
[DEBUG/LokyProcess-33] running the remaining "atexit" finalizers
[INFO/LokyProcess-33] process exiting with exitcode 0
[DEBUG:MainProcess:ThreadManager] shutting down
[DEBUG/LokyProcess-34] Using default backend pickle for pickling.
[DEBUG/LokyProcess-34] recreated blocker with handle 20
[DEBUG/LokyProcess-34] recreated blocker with handle 24
[DEBUG/LokyProcess-34] Queue._after_fork()
[DEBUG/LokyProcess-34] recreated blocker with handle 28
[DEBUG/LokyProcess-34] recreated blocker with handle 32
[INFO/LokyProcess-34] child process calling self.run()
[DEBUG/LokyProcess-34] worker started with timeout=None
[INFO/LokyProcess-34] shutting down worker on sentinel
[INFO/LokyProcess-34] process shutting down
[DEBUG/LokyProcess-34] running all "atexit" finalizers with priority >= 0
[DEBUG/LokyProcess-34] running the remaining "atexit" finalizers
[INFO/LokyProcess-34] process exiting with exitcode 0
[DEBUG/LokyProcess-36] Using default backend pickle for pickling.
[DEBUG/LokyProcess-36] recreated blocker with handle 20
[DEBUG/LokyProcess-36] recreated blocker with handle 24
[DEBUG/LokyProcess-36] Queue._after_fork()
[DEBUG/LokyProcess-36] recreated blocker with handle 28
[DEBUG/LokyProcess-36] recreated blocker with handle 32
[INFO/LokyProcess-36] child process calling self.run()
[DEBUG/LokyProcess-36] worker started with timeout=None
[INFO/LokyProcess-36] shutting down worker on sentinel
[INFO/LokyProcess-36] process shutting down
[DEBUG/LokyProcess-36] running all "atexit" finalizers with priority >= 0
[DEBUG/LokyProcess-36] running the remaining "atexit" finalizers
[INFO/LokyProcess-36] process exiting with exitcode 0
[DEBUG:MainProcess:QueueManager] queue management thread clean shutdown of worker processes: {}
[DEBUG:MainProcess:MainThread] Queue.join_thread()
[DEBUG:MainProcess:MainThread] using context <loky.backend.context.LokyContext object at 0x03B0E670>
[DEBUG:MainProcess:MainThread] created semlock with handle 1244
[DEBUG:MainProcess:MainThread] ProcessPoolExecutor is setup
[DEBUG:MainProcess:MainThread] created semlock with handle 1576
[DEBUG:MainProcess:MainThread] created semlock with handle 1196
[DEBUG:MainProcess:MainThread] Queue._after_fork()
[DEBUG:MainProcess:MainThread] created semlock with handle 1352
[DEBUG:MainProcess:MainThread] Adjust process count : {1344: <LokyProcess(LokyProcess-37, started)>, 2056: <LokyProcess(LokyProcess-38, started)>, 1112: <LokyProcess(LokyProcess-39, started)>, 2172: <LokyProcess(LokyProcess-40, started)>}
[DEBUG:MainProcess:MainThread] _start_queue_management_thread called
[DEBUG:MainProcess:QueueManager] Queue._start_thread()
[DEBUG:MainProcess:QueueManager] doing self._thread.start()
[DEBUG:MainProcess:MainThread] _start_thread_management_thread called
[DEBUG:MainProcess:MainThread] shutting down executor <loky.process_executor.ProcessPoolExecutor object at 0x043D0370>
[DEBUG:MainProcess:QueueFeederThread] starting thread to feed data to pipe
[DEBUG/LokyProcess-39] Using default backend pickle for pickling.
[DEBUG:MainProcess:QueueManager] ... done self._thread.start()
[DEBUG:MainProcess:ThreadManager] shutting down
[DEBUG/LokyProcess-39] recreated blocker with handle 20
[DEBUG/LokyProcess-39] recreated blocker with handle 24
[DEBUG/LokyProcess-39] Queue._after_fork()
[DEBUG/LokyProcess-39] recreated blocker with handle 28
[DEBUG/LokyProcess-39] recreated blocker with handle 32
[INFO/LokyProcess-39] child process calling self.run()
[DEBUG/LokyProcess-39] worker started with timeout=None
[DEBUG/LokyProcess-38] Using default backend pickle for pickling.
[DEBUG/LokyProcess-38] recreated blocker with handle 20
[DEBUG/LokyProcess-38] recreated blocker with handle 24
[DEBUG/LokyProcess-38] Queue._after_fork()
[DEBUG/LokyProcess-38] recreated blocker with handle 28
[DEBUG/LokyProcess-38] recreated blocker with handle 32
[INFO/LokyProcess-38] child process calling self.run()
[DEBUG/LokyProcess-38] worker started with timeout=None
[DEBUG/LokyProcess-40] Using default backend pickle for pickling.
[DEBUG/LokyProcess-40] recreated blocker with handle 20
[DEBUG/LokyProcess-40] recreated blocker with handle 24
[DEBUG/LokyProcess-40] Queue._after_fork()
[DEBUG/LokyProcess-40] recreated blocker with handle 28
[DEBUG/LokyProcess-40] recreated blocker with handle 32
[INFO/LokyProcess-40] child process calling self.run()
[DEBUG/LokyProcess-40] worker started with timeout=None
[DEBUG/LokyProcess-37] Using default backend pickle for pickling.
[DEBUG/LokyProcess-37] recreated blocker with handle 32
[DEBUG/LokyProcess-37] recreated blocker with handle 36
[DEBUG/LokyProcess-37] Queue._after_fork()
[DEBUG/LokyProcess-37] recreated blocker with handle 48
[DEBUG/LokyProcess-37] recreated blocker with handle 52
[INFO/LokyProcess-37] child process calling self.run()
[DEBUG/LokyProcess-37] worker started with timeout=None
~~~~~~~~~~~~~~~~~~~~~~~~~ Stack of QueueManager (2928) ~~~~~~~~~~~~~~~~~~~~~~~~~
  File "c:\python36\Lib\threading.py", line 884, in _bootstrap
    self._bootstrap_inner()
  File "c:\python36\Lib\threading.py", line 916, in _bootstrap_inner
    self.run()
  File "c:\python36\Lib\threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "C:\projects\loky\loky\process_executor.py", line 510, in _queue_management_worker
    timeout=_poll_timeout)
  File "c:\python36\Lib\multiprocessing\connection.py", line 859, in wait
    ready_handles = _exhaustive_wait(waithandle_to_obj.keys(), timeout)
  File "c:\python36\Lib\multiprocessing\connection.py", line 791, in _exhaustive_wait
    res = _winapi.WaitForMultipleObjects(L, False, timeout)
~~~~~~~~~~~~~~~~~~~~~~~~~~ Stack of Thread-26 (1468) ~~~~~~~~~~~~~~~~~~~~~~~~~~~
  File "c:\python36\Lib\threading.py", line 884, in _bootstrap
    self._bootstrap_inner()
  File "c:\python36\Lib\threading.py", line 916, in _bootstrap_inner
    self.run()
  File "c:\python36\Lib\threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "c:\python36\Lib\multiprocessing\resource_sharer.py", line 142, in _serve
    with self._listener.accept() as conn:
  File "c:\python36\Lib\multiprocessing\connection.py", line 453, in accept
    c = self._listener.accept()
  File "c:\python36\Lib\multiprocessing\connection.py", line 663, in accept
    [ov.event], False, INFINITE)
~~~~~~~~~~~~~~~~~~~~~~~~~~ Stack of MainThread (2068) ~~~~~~~~~~~~~~~~~~~~~~~~~~
  File "c:\python36\Lib\runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)
  File "c:\python36\Lib\runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "C:\projects\loky\.tox\py36\Scripts\py.test.EXE\__main__.py", line 9, in <module>
    sys.exit(main())
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\config.py", line 58, in main
    return config.hook.pytest_cmdline_main(config=config)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 745, in __call__
    return self._hookexec(self, self._nonwrappers + self._wrappers, kwargs)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 339, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 334, in <lambda>
    _MultiCall(methods, kwargs, hook.spec_opts).execute()
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 614, in execute
    res = hook_impl.function(*args)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\main.py", line 134, in pytest_cmdline_main
    return wrap_session(config, _main)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\main.py", line 105, in wrap_session
    session.exitstatus = doit(config, session) or 0
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\main.py", line 141, in _main
    config.hook.pytest_runtestloop(session=session)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 745, in __call__
    return self._hookexec(self, self._nonwrappers + self._wrappers, kwargs)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 339, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 334, in <lambda>
    _MultiCall(methods, kwargs, hook.spec_opts).execute()
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 614, in execute
    res = hook_impl.function(*args)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\main.py", line 164, in pytest_runtestloop
    item.config.hook.pytest_runtest_protocol(item=item, nextitem=nextitem)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 745, in __call__
    return self._hookexec(self, self._nonwrappers + self._wrappers, kwargs)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 339, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 334, in <lambda>
    _MultiCall(methods, kwargs, hook.spec_opts).execute()
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 613, in execute
    return _wrapped_call(hook_impl.function(*args), self.execute)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 248, in _wrapped_call
    call_outcome = _CallOutcome(func)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 265, in __init__
    self.result = func()
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 613, in execute
    return _wrapped_call(hook_impl.function(*args), self.execute)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 248, in _wrapped_call
    call_outcome = _CallOutcome(func)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 265, in __init__
    self.result = func()
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 613, in execute
    return _wrapped_call(hook_impl.function(*args), self.execute)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 248, in _wrapped_call
    call_outcome = _CallOutcome(func)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 265, in __init__
    self.result = func()
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 614, in execute
    res = hook_impl.function(*args)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\runner.py", line 60, in pytest_runtest_protocol
    runtestprotocol(item, nextitem=nextitem)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\runner.py", line 73, in runtestprotocol
    reports.append(call_and_report(item, "call", log))
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\runner.py", line 127, in call_and_report
    call = call_runtest_hook(item, when, **kwds)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\runner.py", line 145, in call_runtest_hook
    return CallInfo(lambda: ihook(item=item, **kwds), when=when)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\runner.py", line 157, in __init__
    self.result = func()
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\runner.py", line 145, in <lambda>
    return CallInfo(lambda: ihook(item=item, **kwds), when=when)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 745, in __call__
    return self._hookexec(self, self._nonwrappers + self._wrappers, kwargs)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 339, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 334, in <lambda>
    _MultiCall(methods, kwargs, hook.spec_opts).execute()
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 613, in execute
    return _wrapped_call(hook_impl.function(*args), self.execute)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 248, in _wrapped_call
    call_outcome = _CallOutcome(func)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 265, in __init__
    self.result = func()
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 614, in execute
    res = hook_impl.function(*args)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\runner.py", line 98, in pytest_runtest_call
    item.runtest()
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\python.py", line 1593, in runtest
    self.ihook.pytest_pyfunc_call(pyfuncitem=self)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 745, in __call__
    return self._hookexec(self, self._nonwrappers + self._wrappers, kwargs)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 339, in _hookexec
    return self._inner_hookexec(hook, methods, kwargs)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 334, in <lambda>
    _MultiCall(methods, kwargs, hook.spec_opts).execute()
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 613, in execute
    return _wrapped_call(hook_impl.function(*args), self.execute)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 248, in _wrapped_call
    call_outcome = _CallOutcome(func)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 265, in __init__
    self.result = func()
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\vendored_packages\pluggy.py", line 614, in execute
    res = hook_impl.function(*args)
  File "c:\projects\loky\.tox\py36\lib\site-packages\_pytest\python.py", line 142, in pytest_pyfunc_call
    testfunction(**testargs)
  File "C:\projects\loky\tests\_test_process_executor.py", line 106, in test_shutdown_with_sys_exit_at_pickle
    e.submit(id, ExitAtPickle())
  File "C:\projects\loky\loky\_base.py", line 609, in __exit__
    self.shutdown(wait=True)
  File "C:\projects\loky\loky\process_executor.py", line 987, in shutdown
    self._queue_management_thread.join()
  File "c:\python36\Lib\threading.py", line 1056, in join
    self._wait_for_tstate_lock()
  File "c:\python36\Lib\threading.py", line 1072, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
+++++++++++++++++++++++++++++++++++ Timeout ++++++++++++++++++++++++++++++++++++
ERROR: InvocationError: 'C:\\projects\\loky\\.tox\\py36\\Scripts\\py.test.EXE -vlx --timeout=30'
___________________________________ summary ___________________________________
ERROR:   py36: commands failed

Add loky.__version__?

I was surprised that loky does not have a __version__ attribute. I would say, this is something I expect for most packages:

import loky
print(loky.__version__)

AttributeError: module 'loky' has no attribute '__version__'

[Question] Consequence of broken semaphore on OSX?

Hello,
this is not an issue, but I'm simply trying to understand the consequences of this warning that appears on OSX (due to the missing sem_getvalue function):

loky/backend/semlock.py:217: RuntimeWarning: semaphore are broken on OSX, release might increase its maximal value

import time
from loky import ProcessPoolExecutor

def f(x):
    time.sleep(6)
    return x*x


if __name__ == '__main__':
    with ProcessPoolExecutor(max_workers=3) as executor:
        print("Started")
        results = executor.map(f, [1, 2, 3])
        print(list(results))
        print("Ended")

What is the worse case outcome of this warning?

I also noticed that I could change process_executor.py to use a queue_size of 1 to avoid the warning:

        # Finally setup the queues for interprocess communication
        self._setup_queues(job_reducers, result_reducers, 1) #force a queue_size of 1

But I'm not sure what that would mean in terms of performance.

Love the work by the way, keep it up! ❤️

Random Deadlock

Random deadlock in tests/test_rpool.py::test_crashes
Use $./launch_tests.sh to get the test running until a crash.

Thread 0x00007f0020ecb700 (most recent call first):
  File "/usr/lib/python3.5/multiprocessing/connection.py", line 379 in _recv
  File "/usr/lib/python3.5/multiprocessing/connection.py", line 407 in _recv_bytes
  File "/usr/lib/python3.5/multiprocessing/connection.py", line 250 in recv
  File "/usr/lib/python3.5/multiprocessing/pool.py", line 429 in _handle_results
  File "/usr/lib/python3.5/threading.py", line 862 in run
  File "/usr/lib/python3.5/threading.py", line 914 in _bootstrap_inner
  File "/usr/lib/python3.5/threading.py", line 882 in _bootstrap

Thread 0x00007f001bfff700 (most recent call first):
  File "/usr/lib/python3.5/threading.py", line 293 in wait
  File "/usr/lib/python3.5/queue.py", line 164 in get
  File "/usr/lib/python3.5/multiprocessing/pool.py", line 376 in _handle_tasks
  File "/usr/lib/python3.5/threading.py", line 862 in run
  File "/usr/lib/python3.5/threading.py", line 914 in _bootstrap_inner
  File "/usr/lib/python3.5/threading.py", line 882 in _bootstrap

Thread 0x00007f001affd700 (most recent call first):
  File "/usr/lib/python3.5/multiprocessing/pool.py", line 367 in _handle_workers
  File "/usr/lib/python3.5/threading.py", line 862 in run
  File "/usr/lib/python3.5/threading.py", line 914 in _bootstrap_inner
  File "/usr/lib/python3.5/threading.py", line 882 in _bootstrap

Thread 0x00007f00280f4700 (most recent call first):
  File "/usr/lib/python3.5/threading.py", line 293 in wait
  File "/usr/lib/python3.5/threading.py", line 549 in wait
  File "/usr/lib/python3.5/multiprocessing/pool.py", line 599 in wait
  File "/usr/lib/python3.5/multiprocessing/pool.py", line 602 in get
  File "/usr/lib/python3.5/multiprocessing/pool.py", line 253 in apply
  File "/home/tom/Work/prog/github/RusePool/tests/test_rpool.py", line 168 in test_crashes
  File "/usr/lib/python3.5/site-packages/_pytest/python.py", line 286 in pytest_pyfunc_call
  File "/usr/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py", line 596 in execute
  File "/usr/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py", line 333 in <lambda>
  File "/usr/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py", line 338 in _hookexec
  File "/usr/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py", line 724 in __call__
  File "/usr/lib/python3.5/site-packages/_pytest/python.py", line 1406 in runtest
  File "/usr/lib/python3.5/site-packages/_pytest/runner.py", line 90 in pytest_runtest_call
  File "/usr/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py", line 596 in execute
  File "/usr/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py", line 264 in __init__
  File "/usr/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py", line 247 in _wrapped_call
  File "/usr/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py", line 595 in execute
  File "/usr/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py", line 333 in <lambda>
  File "/usr/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py", line 338 in _hookexec
  File "/usr/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py", line 724 in __call__
  File "/usr/lib/python3.5/site-packages/_pytest/runner.py", line 137 in <lambda>
  File "/usr/lib/python3.5/site-packages/_pytest/runner.py", line 149 in __init__
  File "/usr/lib/python3.5/site-packages/_pytest/runner.py", line 137 in call_runtest_hook
  File "/usr/lib/python3.5/site-packages/_pytest/runner.py", line 119 in call_and_report
  File "/usr/lib/python3.5/site-packages/_pytest/runner.py", line 75 in runtestprotocol
  File "/usr/lib/python3.5/site-packages/_pytest/runner.py", line 65 in pytest_runtest_protocol
  File "/usr/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py", line 596 in execute
  File "/usr/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py", line 264 in __init__
  File "/usr/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py", line 247 in _wrapped_call
  File "/usr/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py", line 595 in execute
  File "/usr/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py", line 333 in <lambda>
  File "/usr/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py", line 338 in _hookexec
  File "/usr/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py", line 724 in __call__
  File "/usr/lib/python3.5/site-packages/_pytest/main.py", line 146 in pytest_runtestloop
  File "/usr/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py", line 596 in execute
  File "/usr/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py", line 333 in <lambda>
  File "/usr/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py", line 338 in _hookexec
  File "/usr/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py", line 724 in __call__
  File "/usr/lib/python3.5/site-packages/_pytest/main.py", line 121 in _main
  File "/usr/lib/python3.5/site-packages/_pytest/main.py", line 90 in wrap_session
  File "/usr/lib/python3.5/site-packages/_pytest/main.py", line 115 in pytest_cmdline_main
  File "/usr/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py", line 596 in execute
  File "/usr/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py", line 333 in <lambda>
  File "/usr/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py", line 338 in _hookexec
  File "/usr/lib/python3.5/site-packages/_pytest/vendored_packages/pluggy.py", line 724 in __call__
  File "/usr/lib/python3.5/site-packages/_pytest/config.py", line 48 in main
  File "/usr/bin/py.test", line 9 in <module>

Detect the number of usable CPU

Multiprocessing with n_jobs given by multiprocessing.cpu_count() is not optimal for systems where not all CPU can be used (in particular, Docker, Travis CI etc).

Following the discussion with @ogrisel , as suggested in the multiprocessing docs using len(os.sched_getaffinity(0)) might be better in the context of loky.

Different error handeling across build versions

The error handling in _handle_tasks is different accross the plateform/version/build. The errors are catched on 3.4.4+ but not on 3.4.2 for instance.
This should be fixed as we need robust behaviours for our usages.
To fix it, the simplest way is probably to overload all the thread handler:
_handle_task, _handle_worker & _handle_result.
For this, we need to re write the __init__ function and the worker function.
Does this make sense?

Test openMP compatibility

Add test with cython and openMP to ensure the compatibility of loky backend with openMP and highlights the crash caused by fork.

Large overhead for very short tasks

In multiprocessing.Pool, there is one thread sending bytes to the workers and another thread receiving bytes from the workers. This is not the case with ProcessPoolExecutor where a single thread handles both type of communication.

Therefore, ProcessPoolExecutor has a larger overhead when submitting a very large number of very short tasks. This could be solve by adding a dedicated thread for feeding the workers.
Here is a piece of code to reproduce the gap:

from itertools import repeat
from loky.reusable_executor import get_reusable_executor
from concurrent.futures import ProcessPoolExecutor

N_ITER = 50000

def long_executor(get_executor, chunksize=1, **kwargs):
    with get_executor(**kwargs) as executor:
        for _ in executor.map(id, repeat(0, N_ITER),
                              chunksize=chunksize):
            pass


def long_pool(get_pool, chunksize=1, **kwargs):
    pool = get_pool(**kwargs)
    for _ in pool.map(id, repeat(0, N_ITER), chunksize=chunksize):
        pass
    pool.terminate()


if __name__ == "__main__":
    import argparse
    parser = argparse.ArgumentParser('Programme to launch experiemnt')
    parser.add_argument('--run', type=str, default=None,
                        help='run loky or exec')
    parser.add_argument('--chunksize', type=int, default=1,
                        help='choose chunksize')

    args = parser.parse_args()
    max_workers = 8
    results = defaultdict(lambda: [])
    if args.run == "loky":
        long_executor(get_reusable_executor, max_workers=max_workers,
                      chunksize=args.chunksize)
    elif args.run == "pool":
        long_pool(context_spawn.Pool, processes=max_workers,
                  chunksize=args.chunksize)
    elif args.run == "ccr":
        long_executor(ProcessPoolExecutor, max_workers=max_workers,
                      chunksize=args.chunksize)

Crash of workers (SIGKILL)

A known bug of the python pool is that when a worker crash, the pool might end up in a broken state.
Bug python

This case is critical for the reusable pool (as we don't want to allow it to happen and break several part of a code).
This could be handled by:

  • Raising error for all cached jobs when a worker crash
  • Improve the _help_stuff_finished function call to handle broken states and empty the inqueue which might be deadlocked.

Test semaphore and file descriptor management

Add unit testing to ensure that all the open file descriptor and semaphore are open/closed in the right process in the loky backend.
This involved:

  • Making sure fd are properly closed when a process is spawn with loky backend.
  • Making sure semaphores are all close once the lock are destroyed with loky backend (python2.7/3.3)
  • Add informative names for semaphores (python2.7/3.3)
  • Making sure the arguments pipes are still open in child processes:
    this was already tested in test_process as we use a mp.Queue to send and receive objects

Improve error message in case of failure in test_wait_result

Observed when running tox -e py27 on linux:

=================================================================================== FAILURES ===================================================================================
________________________________________________________________________ TestCondition.test_wait_result ________________________________________________________________________

self = <tests.test_synchronize.TestCondition instance at 0x7f754456ea70>

    @pytest.mark.skipif(sys.platform == "win32" and
                        sys.version_info[:2] < (3, 3),
                        reason="Condition.wait always returned None before 3.3"
                        " and we do not overload win32 Condition")
    def test_wait_result(self):
        if sys.platform != 'win32':
            pid = os.getpid()
        else:
            pid = None
    
        c = loky_context.Condition()
        with c:
            assert not c.wait(0)
            assert not c.wait(0.1)
    
            p = loky_context.Process(target=self._test_wait_result,
                                     args=(c, pid))
            p.start()
    
            assert c.wait(10)
            if pid is not None:
                with pytest.raises(KeyboardInterrupt):
                    c.wait(10)
>           p.join()

c          = <Condition(<RLock(None, 0)>, 0)>
p          = <LokyProcess(LokyProcess-423, stopped)>
pid        = 20335
self       = <tests.test_synchronize.TestCondition instance at 0x7f754456ea70>

tests/test_synchronize.py:348: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
loky/backend/synchronize.py:249: in __exit__
    return self._lock.__exit__(*args)
loky/backend/synchronize.py:108: in __exit__
    return self._semlock.release()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <loky.backend.semlock.SemLock object at 0x7f75490656d0>

    def release(self):
        if self.kind == RECURSIVE_MUTEX:
            assert self._is_mine(), (
>               "attempt to release recursive lock not owned by thread")
E           AssertionError: attempt to release recursive lock not owned by thread

self       = <loky.backend.semlock.SemLock object at 0x7f75490656d0>

loky/backend/semlock.py:193: AssertionError
----------------------------------------------------------------------------- Captured stderr call -----------------------------------------------------------------------------
[DEBUG:MainProcess:MainThread] created semlock with handle 140141780742144 and name "/loky-20335-L08GeF"
[DEBUG:MainProcess:MainThread] created semlock with handle 140141780738048 and name "/loky-20335-CO5Aui"
[DEBUG:MainProcess:MainThread] created semlock with handle 140141780733952 and name "/loky-20335-bGApZz"
[DEBUG:MainProcess:MainThread] created semlock with handle 140141780729856 and name "/loky-20335-0LIgU2"
[DEBUG:MainProcess:MainThread] launch python with cmd:
['/home/ogrisel/code/loky/.tox/py27/bin/python2.7', '-m', 'loky.backend.popen_loky_posix', '--name-process', 'LokyProcess-423', '--pipe', '19', '--semaphore', '12']
[DEBUG:LokyProcess-423:MainThread] recreated blocker with handle 140141780742144 and name "/loky-20335-L08GeF"
[DEBUG:LokyProcess-423:MainThread] recreated blocker with handle 140141780738048 and name "/loky-20335-CO5Aui"
[DEBUG:LokyProcess-423:MainThread] recreated blocker with handle 140141780733952 and name "/loky-20335-bGApZz"
[DEBUG:LokyProcess-423:MainThread] recreated blocker with handle 140141780729856 and name "/loky-20335-0LIgU2"
[INFO:LokyProcess-423:MainThread] child process calling self.run()
[INFO:LokyProcess-423:MainThread] process shutting down
[DEBUG:LokyProcess-423:MainThread] running all "atexit" finalizers with priority >= 0
[DEBUG:LokyProcess-423:MainThread] running the remaining "atexit" finalizers
[INFO:LokyProcess-423:MainThread] process exiting with exitcode 0
[INFO:LokyProcess-423:Dummy-1] process shutting down
[DEBUG:LokyProcess-423:Dummy-1] running all "atexit" finalizers with priority >= 0
[DEBUG:LokyProcess-423:Dummy-1] running the remaining "atexit" finalizers
========================================================= 1 failed, 132 passed, 5 skipped, 1 xpassed in 67.89 seconds ==========================================================
ERROR: InvocationError: '/home/ogrisel/code/loky/.tox/py27/bin/py.test -lv --maxfail=2 --timeout=10'

DOC Extend documentation with more examples and extended API

Extend the existing doc:

  • Add examples of the fixed deadlocks

    • after worker crash
    • after result unpickle errors
  • Add example of using the reusable executor:

    • show resize capabilities
    • show auto respawn
    • show worker time out
  • Document the behaviors of get_rusable_executor

    • reuse=True only resize the executor if needed.
    • if the arguments are changed, a new executor is created with reuse!=True
  • Tutorial on the concurrent.futures API and maybe some informations on Thread vs Process

Random test failure on current master

====================================================================== FAILURES ======================================================================
____________________________________________________________ test_kill_after_resize_call _____________________________________________________________

exit_on_deadlock = None

    def test_kill_after_resize_call(exit_on_deadlock):
        """Test recovery if killed after resize call"""
        # Test the pool resizing called before a kill arrive
        pool = get_reusable_pool(processes=2)
        pool.apply_async(kill_friend, (pool._pool[1].pid, .1))
        pool = get_reusable_pool(processes=1)
        assert pool.apply(sleep_identity, ((1, 0.),)) == 1
>       pool.terminate()

tests/test_rpool.py:246:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
backend/reusable_pool.py:118: in terminate
    super(_ReusablePool, self).terminate()
/usr/local/Cellar/python/2.7.8_2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py:456: in terminate
    self._terminate()
/usr/local/Cellar/python/2.7.8_2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/util.py:207: in __call__
    res = self._callback(*self._args, **self._kwargs)
backend/reusable_pool.py:249: in _terminate_pool
    cls._help_stuff_finish(inqueue, outqueue, task_handler, len(pool))
backend/reusable_pool.py:299: in _help_stuff_finish
    _ReusablePool._empty_queue(outqueue, task_handler)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

queue = <multiprocessing.queues.SimpleQueue object at 0x109e80a10>, task_handler = <Thread(TaskHandler-18, stopped daemon 123145314922496)>

    @staticmethod
    def _empty_queue(queue, task_handler):
        """Empty a communication queue to ensure that maintainer threads will
            not hang forever.
            """
        # We use a timeout to detect queue that was locked by a dead
        # process and therefor will never be unlocked.
        if not queue._rlock.acquire(timeout=.1):
            mp.util.debug("queue is locked when terminating. "
                          "The pool might have crashed.")
        while task_handler.is_alive() and queue._reader.poll():
>           queue._reader.recv_bytes()
E           IOError: [Errno 22] Invalid argument

backend/reusable_pool.py:312: IOError

Random deadlock in current master

tests/test_rpool.py::test_deadlock_kill Timeout (0:00:05)!
Thread 0x00007fff7b73f000 (most recent call first):
  File "/usr/local/Cellar/python3/3.4.2_1/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/connection.py", line 383 in _recv
  File "/usr/local/Cellar/python3/3.4.2_1/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/connection.py", line 416 in _recv_bytes
  File "/usr/local/Cellar/python3/3.4.2_1/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/connection.py", line 216 in recv_bytes
  File "/Users/ogrisel/code/Rpool/backend/reusable_pool.py", line 312 in _empty_queue
  File "/Users/ogrisel/code/Rpool/backend/reusable_pool.py", line 299 in _help_stuff_finish
  File "/Users/ogrisel/code/Rpool/backend/reusable_pool.py", line 249 in _terminate_pool
  File "/usr/local/Cellar/python3/3.4.2_1/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/util.py", line 185 in __call__
  File "/usr/local/Cellar/python3/3.4.2_1/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/pool.py", line 496 in terminate
  File "/Users/ogrisel/code/Rpool/backend/reusable_pool.py", line 118 in terminate
  File "/Users/ogrisel/code/Rpool/tests/test_rpool.py", line 265 in test_deadlock_kill

Add a benchmark script

To compare against multiprocessing.Pool and the original ProcessPoolExecutor under various versions of Python and on all platforms.

If the benchmark is short enough it could be run after the test suite on travis and appveyor to get an easy way to compare the outcome on all the supported platforms.

Benchmarking:

  • setup speed for Process
  • setup speed for Executor
  • speed of variable transfert between processes (eg large string)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.