nerdwalletoss / kinesis-python Goto Github PK
View Code? Open in Web Editor NEWLow level, multiprocessing based AWS Kinesis producer & consumer library
License: Other
Low level, multiprocessing based AWS Kinesis producer & consumer library
License: Other
See the stacktrace below
11:40:18 celery_general.1 | [2018-10-18 15:40:18,210: ERROR/ForkPoolWorker-2] Task xxx.apps.marketo.tasks.update_marketingdata_lead.update_marketingdata_lead[9f1605d0-bfe2-4549-8cf2-91abcdc8fd7b] raised unexpected: AssertionError('daemonic processes are not allowed to have children',)
11:40:18 celery_general.1 | Traceback (most recent call last):
11:40:18 celery_general.1 | File "venv/lib/python2.7/site-packages/celery/app/trace.py", line 367, in trace_task
11:40:18 celery_general.1 | R = retval = fun(*args, **kwargs)
11:40:18 celery_general.1 | File "venv/lib/python2.7/site-packages/celery/app/trace.py", line 622, in __protected_call__
11:40:18 celery_general.1 | return self.run(*args, **kwargs)
11:40:18 celery_general.1 | File "xxxx/apps/marketo/tasks/update_marketingdata_lead.py", line 81, in update_marketingdata_lead
11:40:18 celery_general.1 | _enqueue_lead_data(email, to_update, user_id)
11:40:18 celery_general.1 | File "xxxx/apps/marketo/tasks/update_marketingdata_lead.py", line 93, in _enqueue_lead_data
11:40:18 celery_general.1 | get_lead_producer().put(json.dumps(lead_data))
11:40:18 celery_general.1 | File "xxxx/apps/xxxx_kinesis/__init__.py", line 39, in get_lead_producer
11:40:18 celery_general.1 | buffer_time=5 * 60,
11:40:18 celery_general.1 | File "venv/lib/python2.7/site-packages/kinesis/producer.py", line 142, in __init__
11:40:18 celery_general.1 | max_size=max_size, boto3_session=boto3_session)
11:40:18 celery_general.1 | File "venv/lib/python2.7/site-packages/kinesis/producer.py", line 78, in __init__
11:40:18 celery_general.1 | self.start()
11:40:18 celery_general.1 | File "venv/lib/python2.7/site-packages/offspring/process.py", line 55, in start
11:40:18 celery_general.1 | self.process.start()
11:40:18 celery_general.1 | File "/usr/local/Cellar/python@2/2.7.15_1/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/process.py", line 124, in start
11:40:18 celery_general.1 | 'daemonic processes are not allowed to have children'
11:40:18 celery_general.1 | AssertionError: daemonic processes are not allowed to have children
My understanding is that using billiard
instead of multiprocess
will solve this issue
Thank you for your work!
I recently implemented the DynamoDB
backend, but when I go to run multiple instances (via Kubernetes) I find that one of the instances errors out, while another picks up. The end result is that only one instance is processing at a time. Is there something I am missing?
Here is the error:
Traceback (most recent call last):
--
File "/usr/local/lib/python3.9/site-packages/kinesis/consumer.py", line 205, in __iter__
self.state.checkpoint(state_shard_id, item['SequenceNumber'])
File "/usr/local/lib/python3.9/site-packages/kinesis/state.py", line 41, in checkpoint
self.dynamo_table.update_item(
File "/usr/local/lib/python3.9/site-packages/boto3/resources/factory.py", line 580, in do_action
response = action(self, *args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/boto3/resources/action.py", line 88, in __call__
response = getattr(parent.meta.client, operation_name)(*args, **params)
File "/usr/local/lib/python3.9/site-packages/botocore/client.py", line 530, in _api_call
return self._make_api_call(operation_name, kwargs)
File "/usr/local/lib/python3.9/site-packages/botocore/client.py", line 960, in _make_api_call
raise error_class(parsed_response, operation_name)
botocore.errorfactory.ConditionalCheckFailedException: An error occurred (ConditionalCheckFailedException) when calling the UpdateItem operation: The conditional request failed
Based on the update expression:
self.dynamo_table.update_item(
Key={'shard': shard_id},
UpdateExpression="set seq = :seq",
ConditionExpression="fqdn = :fqdn AND (attribute_not_exists(seq) OR seq < :seq)",
ExpressionAttributeValues={
':fqdn': fqdn,
':seq': seq,
}
)
It seems likely that the condition check that is failing is the fqdn check. Which would seem reasonable if another consumer came in and claimed the shard. But seems like that is a reasonable thing to happen, so I am not sure why there wouldn't be a graceful relinquish of a shard.
Any help is appreciated! Thanks!
Hi,
when i try to initialise the KinesisProducer object, it throws this error.
self.kproducer = KinesisProducer(stream_name=produce_stream_name)
Traceback:
('2018-07-28 23:58:19,660 - botocore.loaders - MainThread - DEBUG - Loading JSON file: C:\Users\662176\Documents\_Projects\_makerfaire\drone_showcase\pose_recognizer\venv\lib\site-packages\botocore\data\_retry.json
('2018-07-28 23:58:19,662 - botocore.client - MainThread - DEBUG - Registering retry handlers for service: kinesis
Traceback (most recent call last):
File "C:\Users\662176\AppData\Roaming\JetBrains\PyCharm Community Edition 2017.3.3\helpers\pydev\pydevd.py", line 1668, in <module>
main()
File "C:\Users\662176\AppData\Roaming\JetBrains\PyCharm Community Edition 2017.3.3\helpers\pydev\pydevd.py", line 1662, in main
globals = debugger.run(setup['file'], None, None, is_module)
File "C:\Users\662176\AppData\Roaming\JetBrains\PyCharm Community Edition 2017.3.3\helpers\pydev\pydevd.py", line 1072, in run
pydev_imports.execfile(file, globals, locals) # execute the script
File "C:\Users\662176\AppData\Roaming\JetBrains\PyCharm Community Edition 2017.3.3\helpers\pydev\_pydev_imps\_pydev_execfile.py", line 18, in execfile
exec(compile(contents+"\n", file, 'exec'), glob, loc)
File "C:/Users/662176/Documents/_Projects/_makerfaire/drone_showcase/pose_recognizer/src/webapp/app.py", line 27, in <module>
consumer_stream_name="poserec_results")
File "C:\Users\662176\Documents\_Projects\_makerfaire\drone_showcase\pose_recognizer\src\poseapp\poseapp_kinesis.py", line 41, in __init__
self.kproducer = KinesisProducer(stream_name=produce_stream_name)
File "C:\Users\662176\Documents\_Projects\_makerfaire\drone_showcase\pose_recognizer\venv\lib\site-packages\kinesis\producer.py", line 142, in __init__
max_size=max_size, boto3_session=boto3_session)
File "C:\Users\662176\Documents\_Projects\_makerfaire\drone_showcase\pose_recognizer\venv\lib\site-packages\kinesis\producer.py", line 78, in __init__
self.start()
File "C:\Users\662176\Documents\_Projects\_makerfaire\drone_showcase\pose_recognizer\venv\lib\site-packages\offspring\process.py", line 55, in start
self.process.start()
File "C:\ProgramData\Anaconda3\lib\multiprocessing\process.py", line 105, in start
self._popen = self._Popen(self)
File "C:\ProgramData\Anaconda3\lib\multiprocessing\context.py", line 223, in _Popen
return _default_context.get_context().Process._Popen(process_obj)
File "C:\ProgramData\Anaconda3\lib\multiprocessing\context.py", line 322, in _Popen
return Popen(process_obj)
File "C:\ProgramData\Anaconda3\lib\multiprocessing\popen_spawn_win32.py", line 65, in __init__
reduction.dump(process_obj, to_child)
File "C:\ProgramData\Anaconda3\lib\multiprocessing\reduction.py", line 60, in dump
ForkingPickler(file, protocol).dump(obj)
AttributeError: Can't pickle local object 'Subprocess.start.<locals>.bootstrap'
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
File "C:\Users\662176\Documents\_Projects\_makerfaire\drone_showcase\pose_recognizer\venv\lib\site-packages\offspring\process.py", line 75, in atexit
recursively_shutdown(cls)
File "C:\Users\662176\Documents\_Projects\_makerfaire\drone_showcase\pose_recognizer\venv\lib\site-packages\offspring\process.py", line 71, in recursively_shutdown
('2018-07-28 23:58:19,957 - offspring.process - MainThread - DEBUG - Shutting down <kinesis.producer.AsyncProducer object at 0x000002746D1E68D0>
recursively_shutdown(klass)
File "C:\Users\662176\Documents\_Projects\_makerfaire\drone_showcase\pose_recognizer\venv\lib\site-packages\offspring\process.py", line 74, in recursively_shutdown
obj.shutdown()
File "C:\Users\662176\Documents\_Projects\_makerfaire\drone_showcase\pose_recognizer\venv\lib\site-packages\offspring\process.py", line 85, in shutdown
self.wait()
File "C:\Users\662176\Documents\_Projects\_makerfaire\drone_showcase\pose_recognizer\venv\lib\site-packages\offspring\process.py", line 89, in wait
self.process.join()
File "C:\ProgramData\Anaconda3\lib\multiprocessing\process.py", line 123, in join
assert self._popen is not None, 'can only join a started process'
AssertionError: can only join a started process
Will greatly appreciated any help.
While doing some testing of kinesis-python I was evaluating how well it handles live resharding. As old shards remained kinesis-python would spin up a ShardReader and shut it down for each inactive shard. Offspring keeps track of these instances only to cleanup on exit, but they build up overtime and eventually resulting in [Errno 24] Too many open files
Offspring looks dead, but I submitted a pull request that would fix it. borgstrom/offspring#3
Hi,
We are looking to use this library in our project and would be happy to pick up some of the project maintenance if needed.
However we do need a submit a PR, which adds the possibility to pass in an endpoint_url for both the Kinesis (consumer.KinesisConsumer) and DynamoDB (state.DynamoDB) setup, along with a session. We need this so we can work with Localstack.
I'll put together the PR but just wanted to check that you're open to this beforehand!
Thanks
I am new to this tech stack and have been going through the code.
I am trying to find the method should_start_shard_reader
self.should_start_shard_reader in DynamoDB.
Have I missed anything?
I have a question regarding setting the sleep interval for the subprocesses in the consumer
reader_sleep_time=12
should setting the variable actually slow down the loop (the iter overrided in the class), or does it internally set the sleep interval for the subprocess that is listening the stream?
I could not find anything regarding that in the documentation.
thanks
Looking at the producer's loop, it looks like there is no limit to the number of messages (only size is referenced) per flush.
In case of multiple small messages one might surpass the 500 msgs count and the put_messages would fail.
Have I missed something?
Thanks,
itamar
Hi! First of all thanks for the work done on this library. Definitely using KCL is not ideal with dependency on java MultiLangDaemon...
Wanted to know if there are plans to use enhanced fan-out for the consumers? I noticed it is using get_records() instead of subscribe_to_shard(). This would have limitation on the throughput of 2mb/s shared among consumers per shard.
https://docs.amazonaws.cn/en_us/streams/latest/dev/building-enhanced-consumers-api.html
Do you have plans to migrate to using enhanced fan-out for the consumers?
Thanks for this library! It doesn't work out of the box with python3 because "Queue" was renamed to "queue".
I have a branch of this project that handles this at: https://github.com/max-lg/kinesis-python/tree/python3compatability
I'm not sure if it meets your coding standards or if you intend to support python3 but I'd be happy to send a pull request or feel free to copy/paste the code snippet :)
This code is very helpful for understanding how to work with Kinesis Data Streams - thank you!
While sifting through it, I noticed that retries
, which controls the ShardReader
's back-off is never actually incremented as far as I can tell; this line adds 1 but never saves the incremented value:
kinesis-python/src/kinesis/consumer.py
Line 54 in 2bf3551
This should work:
self.retries += 1
log.debug("Retrying get_records (#%d %ds): %s", self.retries, loop_status, exc)
Maybe I'm missing something, but it looks like there's no obvious way to use this library with a VPC interface endpoint for Kinesis streams, to keep traffic internal to a VPC (and avoid the costs of sending the consumer/producer traffic over the public internet).
This would seem to be supported by Boto3's client constructor, so it should be a simple change.
self.kinesis_client = self.boto3_session.client('kinesis', endpoint_url="https://<URL>")
.
If you think this would be beneficial, am happy to send a PR.
Hi there,
I run the following command pip3 install kinesis-python
, but I got the following error:
Collecting kinesis-python
Downloading kinesis-python-0.0.4.tar.gz
Complete output from command python setup.py egg_info:
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "/private/var/folders/b0/m1wbw4x14tn78q7vym9sd8c80000gn/T/pip-build-rinms_kk/kinesis-python/setup.py", line 3, in <module>
with open('VERSION') as version_fd:
FileNotFoundError: [Errno 2] No such file or directory: 'VERSION'
----------------------------------------
Command "python setup.py egg_info" failed with error code 1 in /private/var/folders/b0/m1wbw4x14tn78q7vym9sd8c80000gn/T/pip-build-rinms_kk/kinesis-python/
I guess the code hasn't been updated to the latest version yet.
Cheers.
Hello,
Please note this
from kinesis.consumer import KinesisConsumer
from kinesis.state import DynamoDB
Traceback (most recent call last):
File "", line 1, in
ImportError: No module named state
Regards,
kinesis-python/src/kinesis/producer.py
Line 126 in 2bf3551
By default, failure of individual records within a request does not stop the processing of subsequent records in a boto3 kinesis client put_records() request. This means that a returned records array from put_records() includes both successfully and unsuccessfully processed records. This library is missing detection of unsuccessfully processed records and handling in a subsequent call.
Do I understand it correctly that the code checkpoints as soon as an element is yielded? (
kinesis-python/src/kinesis/consumer.py
Line 209 in 5305f7b
The issue I have with this behaviour is that yielding an item does not imply it has been processed (whatever that means for me). So if my code fails after I've received a record but before I have processed it, it will be marked as processed in DynamoDB, so I will lose this piece of information.
Is my understanding correct? If so, do you think there should be a mechanism to manually checkpoint data? I don't think it's difficult to code up, it's just tricky in terms of API - since you're making shards transparent, it's not quite clear which shard can be checkpointed at what time.
Hi! Thank you for your hard work. I was curious if you are planning on introducing the ability to specify the partition keys upon put record?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.