alex-sherman / deco Goto Github PK
View Code? Open in Web Editor NEWLicense: MIT License
License: MIT License
This was my program.
@concurrent
def add_to_list(num):
num = num + 1
return num
x = []
@synchronized
def loop_to_add():
for i in xrange(1, 100000):
x.append(add_to_list(i))
return x
loop_to_add()
print "Done"
It shows the error -
NameError Traceback (most recent call last)
in ()
1 #%%timeit
----> 2 @Concurrent
3 def add_to_list(num):
4 num = num + 1
5 return num
NameError: name 'concurrent' is not defined
I think I am doing something silly here. But I cannot seem to figure out what.
I found this neat library super useful for speeding up (at least 10x from testing) Pandas read_csv for reading large files and concat the resulting data frames.
However, I found when reading in YAML config for passing **kwargs to read_csv, suddenly it doesn't work.
Example:
from typing import List
from pathlib import Path
from pandas import DataFrame, read_csv, concat
from deco import concurrent, synchronized
@concurrent
def read_csv_worker(path: Path, **kwargs) -> DataFrame:
return read_csv(path, **kwargs)
@synchronized
def read_csv_dispatcher(paths: List[Path], **kwargs) -> DataFrame:
data = list()
for p in paths:
data.append(read_csv_worker(p, **kwargs))
return concat(data)
Fails, but:
from typing import List
from pathlib import Path
from pandas import DataFrame, read_csv, concat
from deco import concurrent, synchronized
@concurrent
def read_csv_worker(path: Path, kwargs) -> DataFrame:
return read_csv(path, **kwargs)
@synchronized
def read_csv_dispatcher(paths: List[Path], **kwargs) -> DataFrame:
data = list()
for p in paths:
data.append(read_csv_worker(p, kwargs))
return concat(data)
works fine. Do the decorators not support **kwargs
pass-throughs?
On trying to adapt an existing program to use deco, I'm getting an error on the first (& only) call to the synchronized
function:
my_fn( _list_of_dicts, _dict, _int1, _int2, _str )
with args of the indicated types. Traceback starting from that call is:
File "~/_VENVs/resizer-with-deco/lib/python3.5/site-packages/deco/conc.py", line 47, in __call__
rewriter.visit(node.body[0])
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/ast.py", line 245, in visit
return visitor(node)
File "~/_VENVs/resizer-with-deco/lib/python3.5/site-packages/deco/astutil.py", line 85, in visit_FunctionDef
self.generic_visit(node)
File "~/_VENVs/resizer-with-deco/lib/python3.5/site-packages/deco/astutil.py", line 55, in generic_visit
super(NodeTransformer, self).generic_visit(node)
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/ast.py", line 253, in generic_visit
self.visit(item)
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/ast.py", line 245, in visit
return visitor(node)
File "~/_VENVs/resizer-with-deco/lib/python3.5/site-packages/deco/astutil.py", line 55, in generic_visit
super(NodeTransformer, self).generic_visit(node)
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/ast.py", line 253, in generic_visit
self.visit(item)
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/ast.py", line 245, in visit
return visitor(node)
File "~/_VENVs/resizer-with-deco/lib/python3.5/site-packages/deco/astutil.py", line 68, in generic_visit
name = child.targets[0].value
AttributeError: 'Name' object has no attribute 'value'
I haven't yet stripped things down to a more basic example.
It could be interesting to offload the parallel jobs to PyPy processes instead of the main Python interpreter, which would be mostly CPython if one wants to plot data, use Pandas or any other library that is not (yet) supported by PyPy. In this use case I see PyPy as a fast computation backend similar to Numba and its @jit
decorator. Furthermore, Numpy arrays could be passed in parameters and handled by Numpypy, provided that pickling/unpickling is compatible between the two.
For it to work it is necessary to use multiprocessing.set_start_method('spawn')
and set_executable()
to set the interpreter to PyPy. Unfortunately this option of starting a fresh Python interpreter process is only available from Python 3.4, and PyPy only supports 3.3 for now. There is also this multiprocess fork of multiprocessing
, which uses dill for a better serialization, so it could be worth integrating the py3.5 version of multiprocess
into deco
, so that set_start_method
can be back-ported to an older version of Python and available in PyPy.
What do you think?
I am using a dictionary to store the contents of two files to enable parallel loading of both file in a dictionary, yet I get an error:
@concurrent
def readfile(path):
return codecs.open(path, encoding='utf-8').read()
@synchronized
def readdocs():
d=dict()
d['source']=readfile(r'./../accessmysql/t1')
d['target']=readfile(r'./../accessmysql/t2')
return d
if __name__ == '__main__':
d=readdocs()
Giver error:
ValueError: Assignment attempted on something that is not index based
Motivated by the use of module global variables, like logging.Logger
instances, deco
should be able to detect free variable reference in its concurrent functions and pass in/proxy any it encounters. This functionality should be optional, perhaps even allowing the specification of exactly which free variables to capture.
A simple test is the following:
from deco import *
global_var = False
@concurrent
def my_conc():
return global_var
if __name__ == '__main__':
global_var = True
result = my_conc().get()
print(result) #Should print True, but prints False unless using concurrent.threaded
def process_url
and then call process_lat_lon
will by modified
typoI'm not exactly sure if this is a problem on my end or with this library, but I can't seem to find the solution.
getting following error:
@concurrent
def createtraindata(filpath):
print filpath
@synchronized
def iterate(fils):
test={}
for f in fils:
test[f[0]]=createtraindata(f[0])
return test
Traceback (most recent call last):
File "createdata.py", line 183, in <module>
print iterate(fils)
File "/home/ttt/anaconda2/lib/python2.7/site-packages/deco/conc.py", line 58, in __call__
return self.f(*args, **kwargs)
File "<string>", line 257, in iterate
File "/home/ttt/anaconda2/lib/python2.7/site-packages/deco/conc.py", line 129, in wait
results.append(self.results.pop().get())
File "/home/ttt/anaconda2/lib/python2.7/site-packages/deco/conc.py", line 144, in get
result, operations = self.async_result.get()
File "/home/ttt/anaconda2/lib/python2.7/multiprocessing/pool.py", line 567, in get
raise self._value
ValueError: If using all scalar values, you must pass an index
Hi,
This looks like a great tool :-)
Just wondering about the memory performance of potentially spinning up lots of processes.
Also, once a set of processes are spun up to handle the users parallel requirements, do they get destroyed? is thatpossible or does that not make sense to do?
Hopefully you can help me understand the answer to these questions.
Thanks!
I have logging in my @concurrent
function and it doesn't seem to work, is it possible the logging level is not being changed for all the processes? When I do a print statement, it seems to work just fine with printing to screen. I'm using the logging
module, I had the logger=logging.getLogger()
as a global, then placed it into the function as well and it still doesn't work.
As a deco user, I would like to be able to specify the type of pool to be used for concurrent work so that I can use the concurrency model (threads, processes, etc.) most suited to my workload.
For example, if I'm doing network IO bound work, I may want to use the multiprocessing.pool.ThreadPool rather than multiprocessing.Pool in order to avoid the overhead of creating new Python processes. I may want to add some features to Pool, say auditing & profiling, so I've created a subclass of Pool to add the features, and I would like to use it with deco.
The change would be to add a supported argument to concurrent
decorator-- something like concurrent(pool_class = SomeClass)
. It appears that this will only require a change to the __init__
method of concurrent
in order to check for the presence of this argument and set the instance local pool class, and a single line change to __call__
-- if self.pool is None: self.pool = self.pool_class(....)
.
Does this sound reasonable to you all?
Thanks for publishing this code, I would really like to play with it!
Sadly, without an explicit license, I can't really do so.
"a proper software license is a necessary evil for any code you plan to release to the public (...) Because I did not explicitly indicate a license, I declared an implicit copyright without explaining how others could use my code." Coding Horror
GH has made this easy.
Because of the index-based limitation, this is currently not supported:
@synchronized
for thing in range(10):
concurrent_function(thing)
Obviously there's a simple workaround in most cases to just use list(generator)
. Do you plan to support generators in the future or is this a race condition problem?
Great library! Ideally I'd like a way for an error to stop the whole program. is that possible?
here i was run conc_test.py
$ python conc_test.py
Traceback (most recent call last):
File "/data/data/com.termux/files/usr/lib/python3.6/multiprocessing/synchronize.py", line 29, in
from _multiprocessing import SemLock, sem_unlink
ImportError: cannot import name 'SemLock'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "conc_test.py", line 29, in
test_size(SIZE)
File "/storage/emulated/0/Download/termuxData/deco/deco/conc.py", line 61, in call
return self.f(*args, **kwargs)
File "", line 5, in test_size
File "/storage/emulated/0/Download/termuxData/deco/deco/conc.py", line 110, in assign
self.assigns.append((target, self(*args, **kwargs)))
File "/storage/emulated/0/Download/termuxData/deco/deco/conc.py", line 121, in call
self.concurrency = self.conc_constructor(*self.conc_args, **self.conc_kwargs)
File "/data/data/com.termux/files/usr/lib/python3.6/multiprocessing/context.py", line 119, in Pool
context=self.get_context())
File "/data/data/com.termux/files/usr/lib/python3.6/multiprocessing/pool.py", line 156, in init
self._setup_queues()
File "/data/data/com.termux/files/usr/lib/python3.6/multiprocessing/pool.py", line 249, in _setup_queues
self._inqueue = self._ctx.SimpleQueue()
File "/data/data/com.termux/files/usr/lib/python3.6/multiprocessing/context.py", line 112, in SimpleQueue
return SimpleQueue(ctx=self.get_context())
File "/data/data/com.termux/files/usr/lib/python3.6/multiprocessing/queues.py", line 315, in init
self._rlock = ctx.Lock()
File "/data/data/com.termux/files/usr/lib/python3.6/multiprocessing/context.py", line 66, in Lock
from .synchronize import Lock
File "/data/data/com.termux/files/usr/lib/python3.6/multiprocessing/synchronize.py", line 34, in
" function, see issue 3770.")
ImportError: This platform lacks a functioning sem_open implementation, therefore, the required synchronization primitives needed will not function, see issue 3770.
what i forgoted to modul install?
Love the project, saves me a lot of code.
I was just wondering if I can manually specify how many processes I want to allocate.
sorry if the answer is somewhere but I did not find it.
I try to do something like the below snippet since each time I want to use deco on a function f
I have to define two functions, g
with @concurrent
and h
with @synchronized
. The return value of the function with @concurrent
is 'decorated' so I need a g
instead of putting @concurrent
on f
directly, and h
is usually perform a for
loop.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import numpy as np
import deco
def parallel(fn, items):
@deco.concurrent
def _concurrent(item):
return fn(item)
@deco.synchronized
def _parallel(items):
return [_concurrent(item) for item in items]
return _parallel(items)
def f(x):
return np.random.randn(x)
parallel(f, list(range(1000)))
However, I got the following error
Traceback (most recent call last):
File "./test.py", line 25, in <module>
parallel(f, list(range(1000)))
File "./test.py", line 18, in parallel
return _parallel(items)
File "/usr/local/lib/python3.6/site-packages/deco/conc.py", line 61, in __call__
return self.f(*args, **kwargs)
File "<string>", line 3, in _parallel
File "<string>", line 3, in <listcomp>
NameError: name '_concurrent' is not defined
Is it possible to use deco on nested functions? Thanks.
@concurrent
def preprocess(txt):
txt = rep.replace(txt)
txt = txt.split(u'\n')
return txt
@concurrent
@synchronized
def readfile(path):
txt= codecs.open(path, encoding='utf-8').read().strip()
return preprocess(txt)
In the above code I want preprocess to be called individually hence even that is marked as concurrent and since I am using it in readfile I have marked it as both
@Concurrent
@synchronized
However error thrown :TypeError: unsupported operand type(s) for -: 'synchronized' and 'int'
First of all thanks for the great tool, it's really easy to use.
However I've found a strange behaviour when creating and returning relatively large structures (e.g. a list with 1000000) in the concurrent function and returning them to the synchronized function. The memory allocated for the list in the concurrent function is simply never freed again. I guess some reference is leftover maybe!? I've build a little stand-alone code to reproduce the problem:
The setup looks like the following:
import multiprocessing
from deco import concurrent, synchronized
@concurrent
def my_conc():
tmp = range(1000000)
return tmp
@synchronized
def my_sync(my_dict):
new_dict = {}
for key, value in my_dict.iteritems():
new_dict[key] = my_conc()
def main():
cpus = multiprocessing.cpu_count()
my_dict = {}
for i in xrange(cpus):
my_dict[i] = 0
for i in xrange(100):
print i
my_sync(my_dict)
if __name__ == '__main__':
main()
So depending on the number of cpus I build n lists with 1000000 ints, and call the synchronized consecutively in a for loop. The allocated memory basically increases until all of it is used and my pc starts swapping...
As soon as I remove the decoraters everything works fine (although not concurrent) ;). Also this only happens if I return tmp in the my_conc() function. Once I replace it with 'return 0' everythings fine again.
I'm sorry if I misunderstood some limitation of the tool - it's my first time using parallel processing in python.
Thanks in advance!
I downloaded the archive and thought I would install from the directory containing setup, in a Py3.5 virtual env. Just trying the--help
option gives:
$ ./setup.py --help
from: can't read /var/mail/distutils.core
./setup.py: line 3: syntax error near unexpected token `newline'
./setup.py: line 3: `setup('
pip install deco
works just fine, so this is a minor issue. However, the PyPI version of deco seems to be not up to date.
Is it possible to have your concurrent function accept new work items after the concurrent function has been started already? Perhaps with a work queue for instance?
I gather from the documentation and examples this is not possible at this time.
Nice work on a clean , simple way of enabling multiprocessing in a straight forward way!
Thank you,
Rob
of late I am getting ConcurrentResult objects in my @synchronized methods , I have to explicity call .wait() to wait for all the concurrent processed to finish.
Why is this happening?
deco seems to make my program use more memory than I expected (regardless of the number of workers)
Let's say I have code like this (left out details for clarity)
@concurrent
def slow(index):
... do something
def run():
for index in iterator_w_200K_items:
slow(index)
slow.wait()
It seems like the iterator is being read all the way through at once (and pending jobs created). So it's using too much memory. (To verify I replaced iterator_w_200K_items with iterator_w_2K_items and memory usage went way down.)
Is there a way I can have deco work in smaller sized chunks?
I hope that makes sense.
Hey,
I know this because, well, it's not implemented.
However since standalone methods are objects in python just as class methods are, what are the main sticking point preventing that from being usable on class methods? Could we get some sort of "what needs to happen" for that to be possible?
I would see a real use for that on class methods. Say you have a DataObject that contains a Gig of data in numpy arrays or something. Maybe that object also defines operations that can be perform on its data. It would be great from a software architecture perspective to keep the object-oriented code and those methods in the class definition, but still be able to parallele process it like you allow it to be done on standalone methods.
Am I am making sense?
How is it possible to process the results like the futures.ascompleted construct , so as soon as the result is available it can be processed?
Instead of doing function decoration cant we do at lower granularity and mark the section of code rather then delegating it as a separate function?
While experimenting with deco
and pandas
I was hoping that the code below would work.
The intention was to simulate parallel-processing of a dummy pandas.DataFrame
, where vectorized implementation is supposedly not possible.
import pandas as pd
n = 1000
df = pd.DataFrame({'str': 'Row'+pd.Series(range(n)).astype(str),
'num': pd.np.random.randint(1,10,n)})
df.head()
Produces:
num str
0 3 Row0
1 8 Row1
2 1 Row2
3 2 Row3
4 9 Row4
Now trying to join
cells in each row with:
from deco import *
@concurrent
def join_str(series, sep=', '):
return sep.join(map(str, series))
@synchronized
def joiner(df, cols, sep=', '):
joined = pd.Series(index=df.index)
for row in df.index:
joined[row] = join_str(df.loc[row, cols], sep=sep)
return joined
joiner(df, ['str','num'])
Gives an error Assignment attempted on something that is not index based
:
ValueError Traceback (most recent call last)
..........
D:\Anaconda\envs\py2k\lib\site-packages\deco\astutil.pyc in subscript_name(node)
38 return node.id
39 elif type(node) is ast.Subscript:
---> 40 return SchedulerRewriter.subscript_name(node.value)
41 raise ValueError("Assignment attempted on something that is not index based")
42
D:\Anaconda\envs\py2k\lib\site-packages\deco\astutil.pyc in subscript_name(node)
39 elif type(node) is ast.Subscript:
40 return SchedulerRewriter.subscript_name(node.value)
---> 41 raise ValueError("Assignment attempted on something that is not index based")
42
43 def is_concurrent_call(self, node):
ValueError: Assignment attempted on something that is not index based
Which is somewhat strange given that assignment is index based.
Is this because deco
doesn't 'understand' this particular data structure (pandas.Series
) or is there a problem in the code?
Hi, just wondering if you plan to update this to work on Python 3?
Loops like the following are currently not allowed in @synchronized
functions, but will not throw a useful exception.
@synchronized
def run():
x = []
for i in range(...):
x.append(conccurent(i))
return x
Handling more cases of @concurrent
result uses is a good idea, but more importantly users should be warned when their function is invalid.
Python 3.5.1 / deco from pip installed from github
Test case:
from deco import concurrent, synchronized
import time
class DecoClass:
@concurrent
def work(self):
time.sleep(0.1)
@synchronized
def run(self):
for _ in range(100):
self.work()
if __name__ == "__main__":
start = time.time()
x = DecoClass()
x.run()
print("Executing in serial should take 10 seconds")
print("Executing in parallel took:", time.time() - start, "seconds")
The error message:
File "<unknown>", line 1
@synchronized
^
IndentationError: unexpected indent
I'm doing some heavy work on csv and would like to pass a generator
into the @concurrent
function, but script just stalls, so when I swap it out for a list
, it actually runs.
When installing the version available on PyPI the statement import conc
on deco/__init__.py
does not import deco.py
correctly, eventually failing on usage. The master version seems to have corrected that issue.
It may be worth making a new release with this issue fixed.
I'm using deco with Ubuntu 16 and Python 3. When I run my script, I can successfully spin up 6 processes but they don't exit when the script is finished running. Next time I run it, it spins up another 6 processes. So in this example, I'll have 12+ instance of python running.
Is there a way to gracefully shut them down that I'm missing?
Issue with my understanding of multiprocessing, not the module.
Hi,
Really love this idea, this is the way parallel processing should be :)
I'm having an issue where the same code will sometimes work as expected and create many processes, however, sometimes it appears it will get stuck with one process.
Wondering if anyone else is having/has had this issue and if there is a fix.
I've had this issue on osx, windows and through python and ipython qt console.
I'm using Python 3.4 on Ubuntu.
I'm testing out deco
in a quick program I'm writing located here.
However, I'm getting this cryptic error:
runfile('/home/alexpetralia/Desktop/boston_events/main.py', wdir='/home/alexpetralia/Desktop/boston_events')
Traceback (most recent call last):
File "<ipython-input-4-b3482a262993>", line 1, in <module>
runfile('/home/alexpetralia/Desktop/boston_events/main.py', wdir='/home/alexpetralia/Desktop/boston_events')
File "/usr/lib/python3/dist-packages/spyderlib/widgets/externalshell/sitecustomize.py", line 586, in runfile
execfile(filename, namespace)
File "/usr/lib/python3/dist-packages/spyderlib/widgets/externalshell/sitecustomize.py", line 48, in execfile
exec(compile(open(filename, 'rb').read(), filename, 'exec'), namespace)
File "/home/alexpetralia/Desktop/boston_events/main.py", line 71, in <module>
x = populate_df()
File "/usr/local/lib/python3.4/dist-packages/deco/conc.py", line 47, in __call__
rewriter.visit(node.body[0])
File "/usr/lib/python3.4/ast.py", line 245, in visit
return visitor(node)
File "/usr/local/lib/python3.4/dist-packages/deco/astutil.py", line 85, in visit_FunctionDef
self.generic_visit(node)
File "/usr/local/lib/python3.4/dist-packages/deco/astutil.py", line 55, in generic_visit
super(NodeTransformer, self).generic_visit(node)
File "/usr/lib/python3.4/ast.py", line 253, in generic_visit
self.visit(item)
File "/usr/lib/python3.4/ast.py", line 245, in visit
return visitor(node)
File "/usr/local/lib/python3.4/dist-packages/deco/astutil.py", line 68, in generic_visit
name = child.targets[0].value
AttributeError: 'Tuple' object has no attribute 'value'
Any ideas what may be causing this?
The following code works:
@concurrent
def scrape(link):
# do work
pass
x = [None] * 100000
@synchronized
def populate_df(links):
for i, link in enumerate(links):
x[i] = scrape(link)
return x
However, this code does not:
@concurrent
def scrape(link):
# do work
pass
@synchronized
def populate_df(links):
x = []
for i, link in enumerate(links):
x.append(scrape(link))
return x
Instead it returns a list, one element for each element in the initial links
list:
<multiprocessing.pool.ApplyResult at 0x7f784ba424a8>,
<multiprocessing.pool.ApplyResult at 0x7f784ba42550>,
<multiprocessing.pool.ApplyResult at 0x7f784ba425f8>,
<multiprocessing.pool.ApplyResult at 0x7f784ba426a0>,
<multiprocessing.pool.ApplyResult at 0x7f784ba42748>,
<multiprocessing.pool.ApplyResult at 0x7f784ba427f0>,
<multiprocessing.pool.ApplyResult at 0x7f784ba42898>,
<multiprocessing.pool.ApplyResult at 0x7f784ba42940>,
<multiprocessing.pool.ApplyResult at 0x7f784ba429e8>,
<multiprocessing.pool.ApplyResult at 0x7f784ba42a90>,
<multiprocessing.pool.ApplyResult at 0x7f784ba42b38>,
<multiprocessing.pool.ApplyResult at 0x7f784ba42be0>,
<multiprocessing.pool.ApplyResult at 0x7f784ba42c88>,
<multiprocessing.pool.ApplyResult at 0x7f784ba42d30>,
<multiprocessing.pool.ApplyResult at 0x7f784ba42dd8>,
<multiprocessing.pool.ApplyResult at 0x7f784ba42e80>,
<multiprocessing.pool.ApplyResult at 0x7f784ba42f28>,
<multiprocessing.pool.ApplyResult at 0x7f784ba42fd0>,
<multiprocessing.pool.ApplyResult at 0x7f784ba480b8>,
<multiprocessing.pool.ApplyResult at 0x7f784ba48160>,
<multiprocessing.pool.ApplyResult at 0x7f784ba48208>,
<multiprocessing.pool.ApplyResult at 0x7f784ba482b0>,
<multiprocessing.pool.ApplyResult at 0x7f784ba48358>,
<multiprocessing.pool.ApplyResult at 0x7f784ba48400>,
<multiprocessing.pool.ApplyResult at 0x7f784ba484a8>,
<multiprocessing.pool.ApplyResult at 0x7f784ba48550>,
<multiprocessing.pool.ApplyResult at 0x7f784ba485f8>,
<multiprocessing.pool.ApplyResult at 0x7f784ba486a0>,
<multiprocessing.pool.ApplyResult at 0x7f784ba48748>,
<multiprocessing.pool.ApplyResult at 0x7f784ba487f0>]
Would it be possible to get append
to work?
I'm getting this in Jupyter notebok 5.7.8 with WinPython 3.6.8-64 on Windows 10. This is the first deco'd example from here
RemoteTraceback Traceback (most recent call last)
RemoteTraceback:
"""
Traceback (most recent call last):
File "c:\wpy64-368zro\python-3.6.8.amd64\lib\multiprocessing\pool.py", line 119, in worker
result = (True, func(*args, **kwds))
File "c:\wpy64-368zro\python-3.6.8.amd64\lib\site-packages\deco\conc.py", line 10, in concWrapper
result = concurrent.functions[f](*args, **kwargs)
KeyError: 'slow'
"""The above exception was the direct cause of the following exception:
KeyError Traceback (most recent call last)
in
4 slow(index)
5
----> 6 run()c:\wpy64-368zro\python-3.6.8.amd64\lib\site-packages\deco\conc.py in call(self, *args, **kwargs)
59 exec(out, scope)
60 self.f = scope[self.orig_f.name]
---> 61 return self.f(*args, **kwargs)
62
63in run()
c:\wpy64-368zro\python-3.6.8.amd64\lib\site-packages\deco\conc.py in wait(self)
134 results = []
135 while self.results:
--> 136 result, operations = self.results.pop().get()
137 self.apply_operations(operations)
138 results.append(result)c:\wpy64-368zro\python-3.6.8.amd64\lib\site-packages\deco\conc.py in get(self)
154
155 def get(self):
--> 156 return self.async_result.get(3e+6)
157
158 def result(self):c:\wpy64-368zro\python-3.6.8.amd64\lib\multiprocessing\pool.py in get(self, timeout)
642 return self._value
643 else:
--> 644 raise self._value
645
646 def _set(self, i, obj):KeyError: 'slow'
# after.py
from deco import concurrent, synchronized
@concurrent
def slow(index):
time.sleep(5)
@synchronized
def run():
for index in list('123'):
slow(index)
run()
Isn't there supposed to be a process_lat_lon.wait()
on the second to last line in the example in the README?
Hey,
I am looking at deco and thought would be cool to use.
What I am doing is I have a parent thread who creates a csv writes headers to the file and then spawns child threads to go to a rest api and request data until there is no more data to request. At which point they return a list of results and the parent thread takes the results and writes them after the header
I keep hitting the same error. The error is a bit meaningless and there are no comments in the code so I cant make head or tails of what is going on?
Any thoughts?
Stack below:
Error
Traceback (most recent call last):
File "C:\Users\an.other\Envs\deco-bug\lib\site-packages\unittest2\case.py", line 67, in testPartExecutor
yield
File "C:\Users\an.other\Envs\deco-bug\lib\site-packages\unittest2\case.py", line 625, in run
testMethod()
File "C:\Users\an.other\Envs\deco-bug\lib\site-packages\freezegun\api.py", line 451, in wrapper
result = func(_args, *_kwargs)
File "C:\Users\an.other\Envs\deco-bug\lib\site-packages\mock\mock.py", line 1305, in patched
return func(_args, *_keywargs)
File "D:\Projects\Python\dbexport\tests\test_dbexport.py", line 95, in test_something
export(database, output_dir=actual_dir)
File "D:\Projects\Python\dbexport\dbexport\dbexport.py", line 53, in export
export_dbrw(db_details, start_time, end_time, target_directory)
File "D:\Projects\Python\dbexport\dbexport\dbrw_export.py", line 129, in export_dbrw
run_dbrw_query(query.get('query_name'), bindings, write_headers, (csvwriter,), append_records, (csvwriter,))
File "C:\Users\an.other\Envs\deco-bug\lib\site-packages\deco\conc.py", line 47, in call
rewriter.visit(node.body[0])
File "C:\Python27\Lib\ast.py", line 241, in visit
return visitor(node)
File "C:\Users\an.other\Envs\deco-bug\lib\site-packages\deco\astutil.py", line 85, in visit_FunctionDef
self.generic_visit(node)
File "C:\Users\an.other\Envs\deco-bug\lib\site-packages\deco\astutil.py", line 55, in generic_visit
super(NodeTransformer, self).generic_visit(node)
File "C:\Python27\Lib\ast.py", line 249, in generic_visit
self.visit(item)
File "C:\Python27\Lib\ast.py", line 241, in visit
return visitor(node)
File "C:\Users\an.other\Envs\deco-bug\lib\site-packages\deco\astutil.py", line 55, in generic_visit
super(NodeTransformer, self).generic_visit(node)
File "C:\Python27\Lib\ast.py", line 249, in generic_visit
self.visit(item)
File "C:\Python27\Lib\ast.py", line 241, in visit
return visitor(node)
File "C:\Users\an.other\Envs\deco-bug\lib\site-packages\deco\astutil.py", line 55, in generic_visit
super(NodeTransformer, self).generic_visit(node)
File "C:\Python27\Lib\ast.py", line 251, in generic_visit
self.visit(value)
File "C:\Python27\Lib\ast.py", line 241, in visit
return visitor(node)
File "C:\Users\an.other\Envs\deco-bug\lib\site-packages\deco\astutil.py", line 55, in generic_visit
super(NodeTransformer, self).generic_visit(node)
File "C:\Python27\Lib\ast.py", line 251, in generic_visit
self.visit(value)
File "C:\Python27\Lib\ast.py", line 241, in visit
return visitor(node)
File "C:\Users\an.other\Envs\deco-bug\lib\site-packages\deco\astutil.py", line 57, in generic_visit
returns = [i for i, child in enumerate(node.body) if type(child) is ast.Return]
TypeError: 'Name' object is not iterable
Cheers, Nice piece of kit (When I did get it to work)
Is it possible to have a @Synchronised function whose body calls another function which is decorated with @Synchronised decorator?
from deco import *
from time import sleep
@concurrent
def bconc():
sleep(3)
return 1
@concurrent
def fconc():
sleep(10)
return 2
@synchronized
def bar():
bd={}
for i in range(5):
bd[i]=bconc()
return bd
@synchronized
def foo():
bd=bar()
fd={}
for i in range(5):
fd[i]=fconc()
return fd,bd
print foo()
site-packages\deco\conc.py", line 57, in __call__
out = compile(self.ast, "<string>", "exec")
TypeError: required field "lineno" missing from stmt
Hi guys,
I'm on Windows so forgive me in advance if that's the cause of any problems I'm also a beginner when it comes to async io and so I might just be misunderstanding something.
I'm making a request to the google app-engine (google analytics) using the pandas ga module, which uses OAuth to communicate with the analytics portion of the app engine.
Here's the code I had written:
import pandas.io.ga as ga
import pandas as pd
from deco import concurrent, synchronized
import time
@concurrent
def d_fetch(date, hour):
t0 = time.time()
data[str(date)+'h'+str(hour)] = [
ga.read_ga(
account_id = "xxx",
profile_id = "xxx",
property_id = "UA-xxx",
metrics = ['sessions','hits', 'bounces'],
dimensions = ['date', 'hour', 'minute', 'medium', 'keyword'],
start_date = date,
end_date = date,
index_col = 0,
filters = "hour==" + '{0:02d}'.format(hour))]
t1 = time.time()
data[str(date)+'h'+str(hour)].append(round(t1-t0,2))
print str(date)+str(hour)+": completed in "+str(round(t1-t0,2))+" secs."
@synchronized
def run(data, dates):
for date in dates:
for hour in xrange(24):
d_fetch(date, hour)
if __name__ == "__main__":
somemute = {}
date_range = pd.date_range(start='5/8/2016', end='5/8/2016', freq='D')
t0 = time.time()
run(somemute, date_range)
t1 = time.time()
print "TOOK", round(t1-t0,2)
And the error that was being raised:
Thanks!
Matt
Hello,
Here is a program I quickly put together in order to play around with Deco:
import collections
import time
from deco import concurrent, synchronized
@concurrent
def compute(x, y):
print("Compute %s + %s ..." % (x, y))
time.sleep(2)
return x + y
@synchronized
def print_sum():
results = collections.defaultdict(dict)
results[1] = compute(1, 2)
results[2] = compute(1, 2)
results[3] = compute(1, 2)
results[4] = compute(1, 2)
final_result = results[1] + results[2] + results[3] + results[4]
print("combined results {}".format(final_result))
print_sum()
Note: I know it's not great, but wanted something quick to play with
But I get the following error...
AttributeError: 'dict' object has no attribute 'iteritems'
...which suggests the code isn't suitable for Python 3 as per this Stack Overflow response to a similar error: http://stackoverflow.com/a/30418498
Am I mistaken, or missing something obvious?
Thanks.
from PIL import Image
from deco import concurrent, synchronized
import time
@Concurrent
def slow(index):
time.sleep(5)
@synchronized
def run():
for index in list('123'):
slow(index)
run()
When I ran the code above, I get error msg below:
"C:\Program Files\Anaconda3\python.exe" C:/Users/zlan1/PycharmProjects/hellow/parallel_computing/deco.py
Traceback (most recent call last):
File "C:/Users/zlan1/PycharmProjects/hellow/parallel_computing/deco.py", line 2, in
from deco import concurrent, synchronized
File "C:\Users\zlan1\PycharmProjects\hellow\parallel_computing\deco.py", line 2, in
from deco import concurrent, synchronized
ImportError: cannot import name 'concurrent'
Process finished with exit code 1
Rather than making a bunch of redundant arguments for the @concurrent
decorator, pass the arguments directly to pool's constructor.
This also avoids a Python 2.6 incompatability pointed out by @gst
hi,
guys, thanks for your great lib. I read it from blog https://www.peterbe.com/plog/deco
I have a question on usage of synchronized, below code works fine.
and I made small modification, it does not work now.
The reason I am doing so is that I want to pass function to run
as a parameter.
I am wondering if you could kindly explain why.
Thanks!
Working version:
import time, requests
urls = """
https://www.peterbe.com/plog/blogitem-040212-1
https://www.peterbe.com/plog/geopy-distance-calculation-pitfall
https://www.peterbe.com/plog/app-for-figuring-out-the-best-car-for-you
https://www.peterbe.com/plog/Mvbackupfiles
https://www.peterbe.com/plog/swedish-holidays-explaine
https://www.peterbe.com/plog/wing-ide-versus-jed
https://www.peterbe.com/plog/worst-flash-site-of-the-year-2010
""".strip().splitlines()
@concurrent
def download(url, data):
t0 = time.time()
assert requests.get(url).status_code == 200
t1 = time.time()
data[url] = t1-t0
@synchronized
def run(data):
for url in urls:
download(url, data)
print(data)
t0 = time.time()
data = {}
run(data)
print(data)
t1 = time.time()
print ("TOOK", t1-t0)
print ("WOULD HAVE TAKEN", sum(data.values()), "seconds")
not working version:
import time, requests
urls = """
https://www.peterbe.com/plog/blogitem-040212-1
https://www.peterbe.com/plog/geopy-distance-calculation-pitfall
https://www.peterbe.com/plog/app-for-figuring-out-the-best-car-for-you
https://www.peterbe.com/plog/Mvbackupfiles
https://www.peterbe.com/plog/swedish-holidays-explaine
https://www.peterbe.com/plog/wing-ide-versus-jed
https://www.peterbe.com/plog/worst-flash-site-of-the-year-2010
""".strip().splitlines()
@concurrent
def download(url, data):
t0 = time.time()
assert requests.get(url).status_code == 200
t1 = time.time()
data[url] = t1-t0
function_map = {
'download': download
}
@synchronized
def run(data):
for url in urls:
function_map['download'](url, data)
print(data)
t0 = time.time()
data = {}
run(data)
print(data)
t1 = time.time()
print ("TOOK", t1-t0)
print ("WOULD HAVE TAKEN", sum(data.values()), "seconds")
Looking through the examples, this script doesn't appear to be Deco related. Am I missing something?
I just wanted to verify this, since it doesn't seem to be documented.
In conc_test.py I see the use of test.processes, does this set the max thread/process count for that specific concurrent function? What does the test.p = None refer too?
The following example seems to not work:
@concurrent
def get_region(region_filepath, variables_to_get):
return {}
@synchronized
def get_data(variable_list, regions):
return_variables = {}
for key in variable_list:
return_variables[key] = []
region_dicts = []
for lat_lon in regions:
region_dicts.append(get_region(root_dir + lat_lon, variable_list))
#get_region.wait()
print("len", len(region_dicts), regions)
#consolidate all the regions
for region in region_dicts:
for key in region.keys():
return_variables[key].extend(region[key])
return return_variables
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.