Giter Club home page Giter Club logo

deco's People

Contributors

alex-sherman avatar cgxeiji avatar garrettwilkin avatar joshuapostel avatar pddenhar avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

deco's Issues

NameError: name 'concurrent' is not defined

This was my program.

@concurrent
def add_to_list(num):
    num = num + 1
    return num

x = []
@synchronized
def loop_to_add():
    for i in xrange(1, 100000):
        x.append(add_to_list(i))
    return x

loop_to_add()
print "Done"

It shows the error -


NameError Traceback (most recent call last)
in ()
1 #%%timeit
----> 2 @Concurrent
3 def add_to_list(num):
4 num = num + 1
5 return num
NameError: name 'concurrent' is not defined

I think I am doing something silly here. But I cannot seem to figure out what.

cannot pass **kwargs to concurrent?

I found this neat library super useful for speeding up (at least 10x from testing) Pandas read_csv for reading large files and concat the resulting data frames.

However, I found when reading in YAML config for passing **kwargs to read_csv, suddenly it doesn't work.

Example:

from typing import List
from pathlib import Path
from pandas import DataFrame, read_csv, concat
from deco import concurrent, synchronized

@concurrent
def read_csv_worker(path: Path, **kwargs) -> DataFrame:
    return read_csv(path, **kwargs)

@synchronized
def read_csv_dispatcher(paths: List[Path], **kwargs) -> DataFrame:
    data = list()
    for p in paths:
        data.append(read_csv_worker(p, **kwargs))
    return concat(data)

Fails, but:

from typing import List
from pathlib import Path
from pandas import DataFrame, read_csv, concat
from deco import concurrent, synchronized

@concurrent
def read_csv_worker(path: Path, kwargs) -> DataFrame:
    return read_csv(path, **kwargs)
    
@synchronized
def read_csv_dispatcher(paths: List[Path], **kwargs) -> DataFrame:
    data = list()
    for p in paths:
        data.append(read_csv_worker(p, kwargs))
    return concat(data)

works fine. Do the decorators not support **kwargs pass-throughs?

astutil.py line 68 `name = child.targets[0].value` "'Name' object has no attribute 'value'" (Py 3.4, 3.5)

On trying to adapt an existing program to use deco, I'm getting an error on the first (& only) call to the synchronized function:

my_fn( _list_of_dicts, _dict, _int1, _int2, _str )

with args of the indicated types. Traceback starting from that call is:

File "~/_VENVs/resizer-with-deco/lib/python3.5/site-packages/deco/conc.py", line 47, in __call__
  rewriter.visit(node.body[0])
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/ast.py", line 245, in visit
  return visitor(node)
File "~/_VENVs/resizer-with-deco/lib/python3.5/site-packages/deco/astutil.py", line 85, in visit_FunctionDef
  self.generic_visit(node)
File "~/_VENVs/resizer-with-deco/lib/python3.5/site-packages/deco/astutil.py", line 55, in generic_visit
  super(NodeTransformer, self).generic_visit(node)
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/ast.py", line 253, in generic_visit
  self.visit(item)
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/ast.py", line 245, in visit
return visitor(node)
File "~/_VENVs/resizer-with-deco/lib/python3.5/site-packages/deco/astutil.py", line 55, in generic_visit
  super(NodeTransformer, self).generic_visit(node)
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/ast.py", line 253, in generic_visit
  self.visit(item)
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/ast.py", line 245, in visit
  return visitor(node)
File "~/_VENVs/resizer-with-deco/lib/python3.5/site-packages/deco/astutil.py", line 68, in generic_visit
  name = child.targets[0].value
AttributeError: 'Name' object has no attribute 'value'

I haven't yet stripped things down to a more basic example.

Support for PyPy processes

It could be interesting to offload the parallel jobs to PyPy processes instead of the main Python interpreter, which would be mostly CPython if one wants to plot data, use Pandas or any other library that is not (yet) supported by PyPy. In this use case I see PyPy as a fast computation backend similar to Numba and its @jit decorator. Furthermore, Numpy arrays could be passed in parameters and handled by Numpypy, provided that pickling/unpickling is compatible between the two.
For it to work it is necessary to use multiprocessing.set_start_method('spawn') and set_executable() to set the interpreter to PyPy. Unfortunately this option of starting a fresh Python interpreter process is only available from Python 3.4, and PyPy only supports 3.3 for now. There is also this multiprocess fork of multiprocessing, which uses dill for a better serialization, so it could be worth integrating the py3.5 version of multiprocess into deco, so that set_start_method can be back-ported to an older version of Python and available in PyPy.
What do you think?

Assignment attempted on something that is not index based

I am using a dictionary to store the contents of two files to enable parallel loading of both file in a dictionary, yet I get an error:


@concurrent
def readfile(path):
    return codecs.open(path, encoding='utf-8').read()


@synchronized
def readdocs():
    d=dict()
    d['source']=readfile(r'./../accessmysql/t1')
    d['target']=readfile(r'./../accessmysql/t2')
    return d
if __name__ == '__main__':
    d=readdocs()

Giver error:
ValueError: Assignment attempted on something that is not index based

Implement free variable capture

Motivated by the use of module global variables, like logging.Logger instances, deco should be able to detect free variable reference in its concurrent functions and pass in/proxy any it encounters. This functionality should be optional, perhaps even allowing the specification of exactly which free variables to capture.

A simple test is the following:

from deco import *

global_var = False

@concurrent
def my_conc():
    return global_var

if __name__ == '__main__':
    global_var = True
    result = my_conc().get()
    print(result) #Should print True, but prints False unless using concurrent.threaded

wiki problems

  1. you def process_url and then call process_lat_lon
  2. will by modified typo

ValueError: If using all scalar values, you must pass an index

getting following error:

@concurrent
def createtraindata(filpath):
    print filpath

@synchronized
def iterate(fils):
    test={}
    for f in fils:
        test[f[0]]=createtraindata(f[0])
    return test


Traceback (most recent call last):                                                                                                                                                     
  File "createdata.py", line 183, in <module>                                                                                                                                          
    print iterate(fils)                                                                                                                                                                
  File "/home/ttt/anaconda2/lib/python2.7/site-packages/deco/conc.py", line 58, in __call__                                                                                       
    return self.f(*args, **kwargs)                                                                                                                                                     
  File "<string>", line 257, in iterate                                                                                                                                                
  File "/home/ttt/anaconda2/lib/python2.7/site-packages/deco/conc.py", line 129, in wait                                                                                          
    results.append(self.results.pop().get())                                                                                                                                           
  File "/home/ttt/anaconda2/lib/python2.7/site-packages/deco/conc.py", line 144, in get                                                                                           
    result, operations = self.async_result.get()                                                                                                                                       
  File "/home/ttt/anaconda2/lib/python2.7/multiprocessing/pool.py", line 567, in get                                                                                              
    raise self._value                                                                                                                                                                  
ValueError: If using all scalar values, you must pass an index  

Question: do processes automatically stop when done?

Hi,

This looks like a great tool :-)

Just wondering about the memory performance of potentially spinning up lots of processes.

Also, once a set of processes are spun up to handle the users parallel requirements, do they get destroyed? is thatpossible or does that not make sense to do?

Hopefully you can help me understand the answer to these questions.

Thanks!

Logging Broke

I have logging in my @concurrent function and it doesn't seem to work, is it possible the logging level is not being changed for all the processes? When I do a print statement, it seems to work just fine with printing to screen. I'm using the logging module, I had the logger=logging.getLogger() as a global, then placed it into the function as well and it still doesn't work.

Support pluggable pool classes

As a deco user, I would like to be able to specify the type of pool to be used for concurrent work so that I can use the concurrency model (threads, processes, etc.) most suited to my workload.

For example, if I'm doing network IO bound work, I may want to use the multiprocessing.pool.ThreadPool rather than multiprocessing.Pool in order to avoid the overhead of creating new Python processes. I may want to add some features to Pool, say auditing & profiling, so I've created a subclass of Pool to add the features, and I would like to use it with deco.

The change would be to add a supported argument to concurrent decorator-- something like concurrent(pool_class = SomeClass). It appears that this will only require a change to the __init__ method of concurrent in order to check for the presence of this argument and set the instance local pool class, and a single line change to __call__-- if self.pool is None: self.pool = self.pool_class(....).

Does this sound reasonable to you all?

Please add a LICENSE file

Thanks for publishing this code, I would really like to play with it!
Sadly, without an explicit license, I can't really do so.

"a proper software license is a necessary evil for any code you plan to release to the public (...) Because I did not explicitly indicate a license, I declared an implicit copyright without explaining how others could use my code." Coding Horror

GH has made this easy.

@synchronized with generators

Because of the index-based limitation, this is currently not supported:

@synchronized
for thing in range(10):
    concurrent_function(thing)

Obviously there's a simple workaround in most cases to just use list(generator). Do you plan to support generators in the future or is this a race condition problem?

conc_test.py was eror in my python 3

here i was run conc_test.py

$ python conc_test.py
Traceback (most recent call last):
File "/data/data/com.termux/files/usr/lib/python3.6/multiprocessing/synchronize.py", line 29, in
from _multiprocessing import SemLock, sem_unlink
ImportError: cannot import name 'SemLock'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "conc_test.py", line 29, in
test_size(SIZE)
File "/storage/emulated/0/Download/termuxData/deco/deco/conc.py", line 61, in call
return self.f(*args, **kwargs)
File "", line 5, in test_size
File "/storage/emulated/0/Download/termuxData/deco/deco/conc.py", line 110, in assign
self.assigns.append((target, self(*args, **kwargs)))
File "/storage/emulated/0/Download/termuxData/deco/deco/conc.py", line 121, in call
self.concurrency = self.conc_constructor(*self.conc_args, **self.conc_kwargs)
File "/data/data/com.termux/files/usr/lib/python3.6/multiprocessing/context.py", line 119, in Pool
context=self.get_context())
File "/data/data/com.termux/files/usr/lib/python3.6/multiprocessing/pool.py", line 156, in init
self._setup_queues()
File "/data/data/com.termux/files/usr/lib/python3.6/multiprocessing/pool.py", line 249, in _setup_queues
self._inqueue = self._ctx.SimpleQueue()
File "/data/data/com.termux/files/usr/lib/python3.6/multiprocessing/context.py", line 112, in SimpleQueue
return SimpleQueue(ctx=self.get_context())
File "/data/data/com.termux/files/usr/lib/python3.6/multiprocessing/queues.py", line 315, in init
self._rlock = ctx.Lock()
File "/data/data/com.termux/files/usr/lib/python3.6/multiprocessing/context.py", line 66, in Lock
from .synchronize import Lock
File "/data/data/com.termux/files/usr/lib/python3.6/multiprocessing/synchronize.py", line 34, in
" function, see issue 3770.")
ImportError: This platform lacks a functioning sem_open implementation, therefore, the required synchronization primitives needed will not function, see issue 3770.

what i forgoted to modul install?

Can deco decorates nested functions?

I try to do something like the below snippet since each time I want to use deco on a function f I have to define two functions, g with @concurrent and h with @synchronized. The return value of the function with @concurrent is 'decorated' so I need a g instead of putting @concurrent on f directly, and h is usually perform a for loop.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import numpy as np
import deco


def parallel(fn, items):

    @deco.concurrent
    def _concurrent(item):
        return fn(item)

    @deco.synchronized
    def _parallel(items):
        return [_concurrent(item) for item in items]

    return _parallel(items)


def f(x):
    return np.random.randn(x)


parallel(f, list(range(1000)))

However, I got the following error

Traceback (most recent call last):
  File "./test.py", line 25, in <module>
    parallel(f, list(range(1000)))
  File "./test.py", line 18, in parallel
    return _parallel(items)
  File "/usr/local/lib/python3.6/site-packages/deco/conc.py", line 61, in __call__
    return self.f(*args, **kwargs)
  File "<string>", line 3, in _parallel
  File "<string>", line 3, in <listcomp>
NameError: name '_concurrent' is not defined

Is it possible to use deco on nested functions? Thanks.

Synchronized concurrent method?

@concurrent
def preprocess(txt):
    txt = rep.replace(txt)
    txt = txt.split(u'\n')
    return txt


@concurrent
@synchronized
def readfile(path):
    txt= codecs.open(path, encoding='utf-8').read().strip()
    return preprocess(txt)

In the above code I want preprocess to be called individually hence even that is marked as concurrent and since I am using it in readfile I have marked it as both
@Concurrent
@synchronized

However error thrown :TypeError: unsupported operand type(s) for -: 'synchronized' and 'int'

Memory isn't freed when creating and returning large objects in concurrent function

First of all thanks for the great tool, it's really easy to use.

However I've found a strange behaviour when creating and returning relatively large structures (e.g. a list with 1000000) in the concurrent function and returning them to the synchronized function. The memory allocated for the list in the concurrent function is simply never freed again. I guess some reference is leftover maybe!? I've build a little stand-alone code to reproduce the problem:

The setup looks like the following:

import multiprocessing
from deco import concurrent, synchronized


@concurrent
def my_conc():
    tmp = range(1000000)
    return tmp


@synchronized
def my_sync(my_dict):
    new_dict = {}

    for key, value in my_dict.iteritems():
        new_dict[key] = my_conc()


def main():
    cpus = multiprocessing.cpu_count()
    my_dict = {}
    for i in xrange(cpus):
        my_dict[i] = 0

    for i in xrange(100):
        print i
        my_sync(my_dict)


if __name__ == '__main__':
    main()

So depending on the number of cpus I build n lists with 1000000 ints, and call the synchronized consecutively in a for loop. The allocated memory basically increases until all of it is used and my pc starts swapping...

As soon as I remove the decoraters everything works fine (although not concurrent) ;). Also this only happens if I return tmp in the my_conc() function. Once I replace it with 'return 0' everythings fine again.

I'm sorry if I misunderstood some limitation of the tool - it's my first time using parallel processing in python.

Thanks in advance!

setup.py problem (Python 3.5)

I downloaded the archive and thought I would install from the directory containing setup, in a Py3.5 virtual env. Just trying the--help option gives:

$ ./setup.py --help
from: can't read /var/mail/distutils.core
./setup.py: line 3: syntax error near unexpected token `newline'
./setup.py: line 3: `setup('

pip install deco works just fine, so this is a minor issue. However, the PyPI version of deco seems to be not up to date.

Example of a dynamic work load.

Is it possible to have your concurrent function accept new work items after the concurrent function has been started already? Perhaps with a work queue for instance?

I gather from the documentation and examples this is not possible at this time.

Nice work on a clean , simple way of enabling multiprocessing in a straight forward way!

Thank you,
Rob

High memory usage - how to reduce?

deco seems to make my program use more memory than I expected (regardless of the number of workers)

Let's say I have code like this (left out details for clarity)

@concurrent
def slow(index):
    ... do something

def run():
    for index in iterator_w_200K_items:
        slow(index)
    slow.wait()

It seems like the iterator is being read all the way through at once (and pending jobs created). So it's using too much memory. (To verify I replaced iterator_w_200K_items with iterator_w_2K_items and memory usage went way down.)

Is there a way I can have deco work in smaller sized chunks?

I hope that makes sense.

Decorators from deco cannot be used on class methods

Hey,

I know this because, well, it's not implemented.

However since standalone methods are objects in python just as class methods are, what are the main sticking point preventing that from being usable on class methods? Could we get some sort of "what needs to happen" for that to be possible?

I would see a real use for that on class methods. Say you have a DataObject that contains a Gig of data in numpy arrays or something. Maybe that object also defines operations that can be perform on its data. It would be great from a software architecture perspective to keep the object-oriented code and those methods in the class definition, but still be able to parallele process it like you allow it to be done on standalone methods.

Am I am making sense?

using futures.ascompleted construct

How is it possible to process the results like the futures.ascompleted construct , so as soon as the result is available it can be processed?

Instead of doing function decoration cant we do at lower granularity and mark the section of code rather then delegating it as a separate function?

Deco with pandas data structures

While experimenting with deco and pandas I was hoping that the code below would work.

The intention was to simulate parallel-processing of a dummy pandas.DataFrame, where vectorized implementation is supposedly not possible.

import pandas as pd
n = 1000
df = pd.DataFrame({'str': 'Row'+pd.Series(range(n)).astype(str),
                   'num': pd.np.random.randint(1,10,n)})
df.head()

Produces:

   num   str
0    3  Row0
1    8  Row1
2    1  Row2
3    2  Row3
4    9  Row4

Now trying to join cells in each row with:

from deco import *

@concurrent
def join_str(series, sep=', '):
    return sep.join(map(str, series))

@synchronized
def joiner(df, cols, sep=', '):
    joined = pd.Series(index=df.index)
    for row in df.index:
        joined[row] = join_str(df.loc[row, cols], sep=sep)
    return joined

joiner(df, ['str','num'])

Gives an error Assignment attempted on something that is not index based:

ValueError                                Traceback (most recent call last)
..........

D:\Anaconda\envs\py2k\lib\site-packages\deco\astutil.pyc in subscript_name(node)
     38             return node.id
     39         elif type(node) is ast.Subscript:
---> 40             return SchedulerRewriter.subscript_name(node.value)
     41         raise ValueError("Assignment attempted on something that is not index based")
     42 

D:\Anaconda\envs\py2k\lib\site-packages\deco\astutil.pyc in subscript_name(node)
     39         elif type(node) is ast.Subscript:
     40             return SchedulerRewriter.subscript_name(node.value)
---> 41         raise ValueError("Assignment attempted on something that is not index based")
     42 
     43     def is_concurrent_call(self, node):

ValueError: Assignment attempted on something that is not index based

Which is somewhat strange given that assignment is index based.

Is this because deco doesn't 'understand' this particular data structure (pandas.Series) or is there a problem in the code?

Python 3 support

Hi, just wondering if you plan to update this to work on Python 3?

Processor limit?

I've been testing Deco on a AWS instance with 36 cores and but Deco seems to max out at 16-18. Is there a limit to how many processes Deco can run?

Thanks
screen shot 2018-02-14 at 3 38 59 pm

Handle invalid access to @concurrent results

Loops like the following are currently not allowed in @synchronized functions, but will not throw a useful exception.

@synchronized
def run():
    x = []
    for i in range(...):
        x.append(conccurent(i))
    return x

Handling more cases of @concurrent result uses is a good idea, but more importantly users should be warned when their function is invalid.

IndentationError: unexpected indent

Python 3.5.1 / deco from pip installed from github

Test case:

from deco import concurrent, synchronized
import time


class DecoClass:

    @concurrent
    def work(self):
        time.sleep(0.1)

    @synchronized
    def run(self):
        for _ in range(100):
            self.work()

if __name__ == "__main__":
    start = time.time()
    x = DecoClass()
    x.run()
    print("Executing in serial should take 10 seconds")
    print("Executing in parallel took:", time.time() - start, "seconds")

The error message:

  File "<unknown>", line 1
    @synchronized
    ^
IndentationError: unexpected indent

generator argument

I'm doing some heavy work on csv and would like to pass a generator into the @concurrent function, but script just stalls, so when I swap it out for a list, it actually runs.

Version on PyPI seems not to import conc correcly

When installing the version available on PyPI the statement import conc on deco/__init__.py does not import deco.py correctly, eventually failing on usage. The master version seems to have corrected that issue.

It may be worth making a new release with this issue fixed.

Processes don't exit upon completion

I'm using deco with Ubuntu 16 and Python 3. When I run my script, I can successfully spin up 6 processes but they don't exit when the script is finished running. Next time I run it, it spins up another 6 processes. So in this example, I'll have 12+ instance of python running.

Is there a way to gracefully shut them down that I'm missing?

Issues With Hanging Processes / Restart

Hi,

Really love this idea, this is the way parallel processing should be :)

I'm having an issue where the same code will sometimes work as expected and create many processes, however, sometimes it appears it will get stuck with one process.

Wondering if anyone else is having/has had this issue and if there is a fix.

I've had this issue on osx, windows and through python and ipython qt console.

AttributeError: 'Tuple' object has no attribute 'value'

I'm using Python 3.4 on Ubuntu.

I'm testing out deco in a quick program I'm writing located here.

However, I'm getting this cryptic error:

runfile('/home/alexpetralia/Desktop/boston_events/main.py', wdir='/home/alexpetralia/Desktop/boston_events')
Traceback (most recent call last):

  File "<ipython-input-4-b3482a262993>", line 1, in <module>
    runfile('/home/alexpetralia/Desktop/boston_events/main.py', wdir='/home/alexpetralia/Desktop/boston_events')

  File "/usr/lib/python3/dist-packages/spyderlib/widgets/externalshell/sitecustomize.py", line 586, in runfile
    execfile(filename, namespace)

  File "/usr/lib/python3/dist-packages/spyderlib/widgets/externalshell/sitecustomize.py", line 48, in execfile
    exec(compile(open(filename, 'rb').read(), filename, 'exec'), namespace)

  File "/home/alexpetralia/Desktop/boston_events/main.py", line 71, in <module>
    x = populate_df()

  File "/usr/local/lib/python3.4/dist-packages/deco/conc.py", line 47, in __call__
    rewriter.visit(node.body[0])

  File "/usr/lib/python3.4/ast.py", line 245, in visit
    return visitor(node)

  File "/usr/local/lib/python3.4/dist-packages/deco/astutil.py", line 85, in visit_FunctionDef
    self.generic_visit(node)

  File "/usr/local/lib/python3.4/dist-packages/deco/astutil.py", line 55, in generic_visit
    super(NodeTransformer, self).generic_visit(node)

  File "/usr/lib/python3.4/ast.py", line 253, in generic_visit
    self.visit(item)

  File "/usr/lib/python3.4/ast.py", line 245, in visit
    return visitor(node)

  File "/usr/local/lib/python3.4/dist-packages/deco/astutil.py", line 68, in generic_visit
    name = child.targets[0].value

AttributeError: 'Tuple' object has no attribute 'value'

Any ideas what may be causing this?

Feature request: allow .append() to list instead of direct index assignment

The following code works:

@concurrent
def scrape(link):
    # do work
    pass

x = [None] * 100000 
@synchronized
def populate_df(links):
    for i, link in enumerate(links):
        x[i] = scrape(link)
    return x

However, this code does not:

@concurrent
def scrape(link):
    # do work
    pass

@synchronized
def populate_df(links):
    x = []
    for i, link in enumerate(links):
        x.append(scrape(link))
    return x

Instead it returns a list, one element for each element in the initial links list:

 <multiprocessing.pool.ApplyResult at 0x7f784ba424a8>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba42550>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba425f8>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba426a0>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba42748>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba427f0>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba42898>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba42940>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba429e8>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba42a90>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba42b38>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba42be0>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba42c88>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba42d30>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba42dd8>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba42e80>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba42f28>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba42fd0>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba480b8>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba48160>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba48208>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba482b0>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba48358>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba48400>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba484a8>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba48550>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba485f8>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba486a0>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba48748>,
 <multiprocessing.pool.ApplyResult at 0x7f784ba487f0>]

Would it be possible to get append to work?

KeyError with peterbe.com easy example on WinPython 3.6.3.0-64

I'm getting this in Jupyter notebok 5.7.8 with WinPython 3.6.8-64 on Windows 10. This is the first deco'd example from here


RemoteTraceback Traceback (most recent call last)
RemoteTraceback:
"""
Traceback (most recent call last):
File "c:\wpy64-368zro\python-3.6.8.amd64\lib\multiprocessing\pool.py", line 119, in worker
result = (True, func(*args, **kwds))
File "c:\wpy64-368zro\python-3.6.8.amd64\lib\site-packages\deco\conc.py", line 10, in concWrapper
result = concurrent.functions[f](*args, **kwargs)
KeyError: 'slow'
"""

The above exception was the direct cause of the following exception:

KeyError Traceback (most recent call last)
in
4 slow(index)
5
----> 6 run()

c:\wpy64-368zro\python-3.6.8.amd64\lib\site-packages\deco\conc.py in call(self, *args, **kwargs)
59 exec(out, scope)
60 self.f = scope[self.orig_f.name]
---> 61 return self.f(*args, **kwargs)
62
63

in run()

c:\wpy64-368zro\python-3.6.8.amd64\lib\site-packages\deco\conc.py in wait(self)
134 results = []
135 while self.results:
--> 136 result, operations = self.results.pop().get()
137 self.apply_operations(operations)
138 results.append(result)

c:\wpy64-368zro\python-3.6.8.amd64\lib\site-packages\deco\conc.py in get(self)
154
155 def get(self):
--> 156 return self.async_result.get(3e+6)
157
158 def result(self):

c:\wpy64-368zro\python-3.6.8.amd64\lib\multiprocessing\pool.py in get(self, timeout)
642 return self._value
643 else:
--> 644 raise self._value
645
646 def _set(self, i, obj):

KeyError: 'slow'

# after.py

from deco import concurrent, synchronized

@concurrent
def slow(index):
    time.sleep(5)

@synchronized
def run():
    for index in list('123'):
        slow(index)

run()

'Name' object is not iterable

Hey,

I am looking at deco and thought would be cool to use.

What I am doing is I have a parent thread who creates a csv writes headers to the file and then spawns child threads to go to a rest api and request data until there is no more data to request. At which point they return a list of results and the parent thread takes the results and writes them after the header

I keep hitting the same error. The error is a bit meaningless and there are no comments in the code so I cant make head or tails of what is going on?

Any thoughts?

Stack below:

Error
Traceback (most recent call last):
File "C:\Users\an.other\Envs\deco-bug\lib\site-packages\unittest2\case.py", line 67, in testPartExecutor
yield
File "C:\Users\an.other\Envs\deco-bug\lib\site-packages\unittest2\case.py", line 625, in run
testMethod()
File "C:\Users\an.other\Envs\deco-bug\lib\site-packages\freezegun\api.py", line 451, in wrapper
result = func(_args, *_kwargs)
File "C:\Users\an.other\Envs\deco-bug\lib\site-packages\mock\mock.py", line 1305, in patched
return func(_args, *_keywargs)
File "D:\Projects\Python\dbexport\tests\test_dbexport.py", line 95, in test_something
export(database, output_dir=actual_dir)
File "D:\Projects\Python\dbexport\dbexport\dbexport.py", line 53, in export
export_dbrw(db_details, start_time, end_time, target_directory)
File "D:\Projects\Python\dbexport\dbexport\dbrw_export.py", line 129, in export_dbrw
run_dbrw_query(query.get('query_name'), bindings, write_headers, (csvwriter,), append_records, (csvwriter,))
File "C:\Users\an.other\Envs\deco-bug\lib\site-packages\deco\conc.py", line 47, in call
rewriter.visit(node.body[0])
File "C:\Python27\Lib\ast.py", line 241, in visit
return visitor(node)
File "C:\Users\an.other\Envs\deco-bug\lib\site-packages\deco\astutil.py", line 85, in visit_FunctionDef
self.generic_visit(node)
File "C:\Users\an.other\Envs\deco-bug\lib\site-packages\deco\astutil.py", line 55, in generic_visit
super(NodeTransformer, self).generic_visit(node)
File "C:\Python27\Lib\ast.py", line 249, in generic_visit
self.visit(item)
File "C:\Python27\Lib\ast.py", line 241, in visit
return visitor(node)
File "C:\Users\an.other\Envs\deco-bug\lib\site-packages\deco\astutil.py", line 55, in generic_visit
super(NodeTransformer, self).generic_visit(node)
File "C:\Python27\Lib\ast.py", line 249, in generic_visit
self.visit(item)
File "C:\Python27\Lib\ast.py", line 241, in visit
return visitor(node)
File "C:\Users\an.other\Envs\deco-bug\lib\site-packages\deco\astutil.py", line 55, in generic_visit
super(NodeTransformer, self).generic_visit(node)
File "C:\Python27\Lib\ast.py", line 251, in generic_visit
self.visit(value)
File "C:\Python27\Lib\ast.py", line 241, in visit
return visitor(node)
File "C:\Users\an.other\Envs\deco-bug\lib\site-packages\deco\astutil.py", line 55, in generic_visit
super(NodeTransformer, self).generic_visit(node)
File "C:\Python27\Lib\ast.py", line 251, in generic_visit
self.visit(value)
File "C:\Python27\Lib\ast.py", line 241, in visit
return visitor(node)
File "C:\Users\an.other\Envs\deco-bug\lib\site-packages\deco\astutil.py", line 57, in generic_visit
returns = [i for i, child in enumerate(node.body) if type(child) is ast.Return]
TypeError: 'Name' object is not iterable

Cheers, Nice piece of kit (When I did get it to work)

Nested concurrency

Is it possible to have a @Synchronised function whose body calls another function which is decorated with @Synchronised decorator?


from deco import *
from time import sleep
@concurrent
def bconc():
    sleep(3)
    return 1

@concurrent
def fconc():
    sleep(10)
    return 2
@synchronized
def bar():
    bd={}
    for i in range(5):
        bd[i]=bconc()
    return bd



@synchronized
def foo():
    bd=bar()
    fd={}
    for i in range(5):
        fd[i]=fconc()
    return fd,bd


print foo()

site-packages\deco\conc.py", line 57, in __call__
    out = compile(self.ast, "<string>", "exec")
TypeError: required field "lineno" missing from stmt

Google App Engine / Pandas Request Failing:

Hi guys,

I'm on Windows so forgive me in advance if that's the cause of any problems I'm also a beginner when it comes to async io and so I might just be misunderstanding something.

I'm making a request to the google app-engine (google analytics) using the pandas ga module, which uses OAuth to communicate with the analytics portion of the app engine.

Here's the code I had written:

import pandas.io.ga as ga
import pandas as pd
from deco import concurrent, synchronized
import time

@concurrent
def d_fetch(date, hour):
        t0 = time.time()
        data[str(date)+'h'+str(hour)] = [
            ga.read_ga(
            account_id  = "xxx",
            profile_id  = "xxx",
            property_id = "UA-xxx",
            metrics     = ['sessions','hits', 'bounces'],
            dimensions  = ['date', 'hour', 'minute', 'medium', 'keyword'],
            start_date  = date,
            end_date    = date,
            index_col = 0,
            filters = "hour==" + '{0:02d}'.format(hour))]
        t1 = time.time()
        data[str(date)+'h'+str(hour)].append(round(t1-t0,2))
        print str(date)+str(hour)+": completed in "+str(round(t1-t0,2))+" secs."

@synchronized
def run(data, dates):
    for date in dates:
        for hour in xrange(24):
            d_fetch(date, hour)

if __name__ == "__main__":
    somemute = {}
    date_range = pd.date_range(start='5/8/2016', end='5/8/2016', freq='D')

    t0 = time.time()
    run(somemute, date_range)
    t1 = time.time()
    print "TOOK", round(t1-t0,2)

And the error that was being raised:

image

Thanks!
Matt

dict object has no attribute iteritems

Hello,

Here is a program I quickly put together in order to play around with Deco:

import collections
import time
from deco import concurrent, synchronized

@concurrent
def compute(x, y):
    print("Compute %s + %s ..." % (x, y))
    time.sleep(2)
    return x + y

@synchronized
def print_sum():
    results = collections.defaultdict(dict)
    results[1] = compute(1, 2)
    results[2] = compute(1, 2)
    results[3] = compute(1, 2)
    results[4] = compute(1, 2)
    final_result = results[1] + results[2] + results[3] + results[4]
    print("combined results {}".format(final_result))

print_sum()

Note: I know it's not great, but wanted something quick to play with

But I get the following error...

AttributeError: 'dict' object has no attribute 'iteritems'

...which suggests the code isn't suitable for Python 3 as per this Stack Overflow response to a similar error: http://stackoverflow.com/a/30418498

Am I mistaken, or missing something obvious?

Thanks.

cannot import concurrent

from PIL import Image
from deco import concurrent, synchronized
import time

@Concurrent
def slow(index):
time.sleep(5)

@synchronized
def run():
for index in list('123'):
slow(index)

run()

When I ran the code above, I get error msg below:
"C:\Program Files\Anaconda3\python.exe" C:/Users/zlan1/PycharmProjects/hellow/parallel_computing/deco.py
Traceback (most recent call last):
File "C:/Users/zlan1/PycharmProjects/hellow/parallel_computing/deco.py", line 2, in
from deco import concurrent, synchronized
File "C:\Users\zlan1\PycharmProjects\hellow\parallel_computing\deco.py", line 2, in
from deco import concurrent, synchronized
ImportError: cannot import name 'concurrent'

Process finished with exit code 1

Pass arguments of @concurrent to pool

Rather than making a bunch of redundant arguments for the @concurrent decorator, pass the arguments directly to pool's constructor.

This also avoids a Python 2.6 incompatability pointed out by @gst

function(concurrent) in synchronized which is passed by value can not work

hi,

guys, thanks for your great lib. I read it from blog https://www.peterbe.com/plog/deco

I have a question on usage of synchronized, below code works fine.
and I made small modification, it does not work now.
The reason I am doing so is that I want to pass function to run as a parameter.
I am wondering if you could kindly explain why.
Thanks!

Working version:

import time, requests

urls = """
https://www.peterbe.com/plog/blogitem-040212-1
https://www.peterbe.com/plog/geopy-distance-calculation-pitfall
https://www.peterbe.com/plog/app-for-figuring-out-the-best-car-for-you
https://www.peterbe.com/plog/Mvbackupfiles
https://www.peterbe.com/plog/swedish-holidays-explaine
https://www.peterbe.com/plog/wing-ide-versus-jed
https://www.peterbe.com/plog/worst-flash-site-of-the-year-2010
""".strip().splitlines()

@concurrent
def download(url, data):
    t0 = time.time()
    assert requests.get(url).status_code == 200
    t1 = time.time()
    data[url] = t1-t0
  
@synchronized
def run(data):
    for url in urls:
        download(url, data)
    print(data)

t0 = time.time()
data = {}
run(data)
print(data)
t1 = time.time()
print ("TOOK", t1-t0)
print ("WOULD HAVE TAKEN", sum(data.values()), "seconds")

not working version:

import time, requests

urls = """
https://www.peterbe.com/plog/blogitem-040212-1
https://www.peterbe.com/plog/geopy-distance-calculation-pitfall
https://www.peterbe.com/plog/app-for-figuring-out-the-best-car-for-you
https://www.peterbe.com/plog/Mvbackupfiles
https://www.peterbe.com/plog/swedish-holidays-explaine
https://www.peterbe.com/plog/wing-ide-versus-jed
https://www.peterbe.com/plog/worst-flash-site-of-the-year-2010
""".strip().splitlines()

@concurrent
def download(url, data):
    t0 = time.time()
    assert requests.get(url).status_code == 200
    t1 = time.time()
    data[url] = t1-t0
  
function_map = {
    'download': download
}

@synchronized
def run(data):
    for url in urls:
        function_map['download'](url, data)
    print(data)

t0 = time.time()
data = {}
run(data)
print(data)
t1 = time.time()
print ("TOOK", t1-t0)
print ("WOULD HAVE TAKEN", sum(data.values()), "seconds")

thread/process count

I just wanted to verify this, since it doesn't seem to be documented.

In conc_test.py I see the use of test.processes, does this set the max thread/process count for that specific concurrent function? What does the test.p = None refer too?

Bug with deco function call

The following example seems to not work:

@concurrent
def get_region(region_filepath, variables_to_get):
	return {}

@synchronized
def get_data(variable_list, regions):
	return_variables = {}
	for key in variable_list:
		return_variables[key] = []
	region_dicts = []
	for lat_lon in regions:
		region_dicts.append(get_region(root_dir + lat_lon, variable_list))
	#get_region.wait()
	print("len", len(region_dicts), regions)
	#consolidate all the regions
	for region in region_dicts:
		for key in region.keys():
			return_variables[key].extend(region[key])
	return return_variables

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.