Giter Club home page Giter Club logo

kinesis-python's People

Contributors

borgstrom avatar buildnerd avatar hengfengli avatar jasonbot avatar langseth avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kinesis-python's Issues

Cannot be used from within Celery

See the stacktrace below

11:40:18 celery_general.1 | [2018-10-18 15:40:18,210: ERROR/ForkPoolWorker-2] Task xxx.apps.marketo.tasks.update_marketingdata_lead.update_marketingdata_lead[9f1605d0-bfe2-4549-8cf2-91abcdc8fd7b] raised unexpected: AssertionError('daemonic processes are not allowed to have children',)
11:40:18 celery_general.1 | Traceback (most recent call last):
11:40:18 celery_general.1 |   File "venv/lib/python2.7/site-packages/celery/app/trace.py", line 367, in trace_task
11:40:18 celery_general.1 |     R = retval = fun(*args, **kwargs)
11:40:18 celery_general.1 |   File "venv/lib/python2.7/site-packages/celery/app/trace.py", line 622, in __protected_call__
11:40:18 celery_general.1 |     return self.run(*args, **kwargs)
11:40:18 celery_general.1 |   File "xxxx/apps/marketo/tasks/update_marketingdata_lead.py", line 81, in update_marketingdata_lead
11:40:18 celery_general.1 |     _enqueue_lead_data(email, to_update, user_id)
11:40:18 celery_general.1 |   File "xxxx/apps/marketo/tasks/update_marketingdata_lead.py", line 93, in _enqueue_lead_data
11:40:18 celery_general.1 |     get_lead_producer().put(json.dumps(lead_data))
11:40:18 celery_general.1 |   File "xxxx/apps/xxxx_kinesis/__init__.py", line 39, in get_lead_producer
11:40:18 celery_general.1 |     buffer_time=5 * 60,
11:40:18 celery_general.1 |   File "venv/lib/python2.7/site-packages/kinesis/producer.py", line 142, in __init__
11:40:18 celery_general.1 |     max_size=max_size, boto3_session=boto3_session)
11:40:18 celery_general.1 |   File "venv/lib/python2.7/site-packages/kinesis/producer.py", line 78, in __init__
11:40:18 celery_general.1 |     self.start()
11:40:18 celery_general.1 |   File "venv/lib/python2.7/site-packages/offspring/process.py", line 55, in start
11:40:18 celery_general.1 |     self.process.start()
11:40:18 celery_general.1 |   File "/usr/local/Cellar/python@2/2.7.15_1/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/process.py", line 124, in start
11:40:18 celery_general.1 |     'daemonic processes are not allowed to have children'
11:40:18 celery_general.1 | AssertionError: daemonic processes are not allowed to have children

My understanding is that using billiard instead of multiprocess will solve this issue

How to have multiple consumers?

Thank you for your work!

I recently implemented the DynamoDB backend, but when I go to run multiple instances (via Kubernetes) I find that one of the instances errors out, while another picks up. The end result is that only one instance is processing at a time. Is there something I am missing?

Here is the error:

Traceback (most recent call last):
--
File "/usr/local/lib/python3.9/site-packages/kinesis/consumer.py", line 205, in __iter__
self.state.checkpoint(state_shard_id, item['SequenceNumber'])
File "/usr/local/lib/python3.9/site-packages/kinesis/state.py", line 41, in checkpoint
self.dynamo_table.update_item(
File "/usr/local/lib/python3.9/site-packages/boto3/resources/factory.py", line 580, in do_action
response = action(self, *args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/boto3/resources/action.py", line 88, in __call__
response = getattr(parent.meta.client, operation_name)(*args, **params)
File "/usr/local/lib/python3.9/site-packages/botocore/client.py", line 530, in _api_call
return self._make_api_call(operation_name, kwargs)
File "/usr/local/lib/python3.9/site-packages/botocore/client.py", line 960, in _make_api_call
raise error_class(parsed_response, operation_name)
botocore.errorfactory.ConditionalCheckFailedException: An error occurred (ConditionalCheckFailedException) when calling the UpdateItem operation: The conditional request failed

Based on the update expression:

            self.dynamo_table.update_item(
                Key={'shard': shard_id},
                UpdateExpression="set seq = :seq",
                ConditionExpression="fqdn = :fqdn AND (attribute_not_exists(seq) OR seq < :seq)",
                ExpressionAttributeValues={
                    ':fqdn': fqdn,
                    ':seq': seq,
                }
            )

It seems likely that the condition check that is failing is the fqdn check. Which would seem reasonable if another consumer came in and claimed the shard. But seems like that is a reasonable thing to happen, so I am not sure why there wouldn't be a graceful relinquish of a shard.

Any help is appreciated! Thanks!

AttributeError: Can't pickle local object 'Subprocess.start.<locals>.bootstrap

Hi,

when i try to initialise the KinesisProducer object, it throws this error.

self.kproducer = KinesisProducer(stream_name=produce_stream_name)

Traceback:

('2018-07-28 23:58:19,660 - botocore.loaders - MainThread - DEBUG - Loading JSON file: C:\Users\662176\Documents\_Projects\_makerfaire\drone_showcase\pose_recognizer\venv\lib\site-packages\botocore\data\_retry.json
('2018-07-28 23:58:19,662 - botocore.client - MainThread - DEBUG - Registering retry handlers for service: kinesis
Traceback (most recent call last):
  File "C:\Users\662176\AppData\Roaming\JetBrains\PyCharm Community Edition 2017.3.3\helpers\pydev\pydevd.py", line 1668, in <module>
    main()
  File "C:\Users\662176\AppData\Roaming\JetBrains\PyCharm Community Edition 2017.3.3\helpers\pydev\pydevd.py", line 1662, in main
    globals = debugger.run(setup['file'], None, None, is_module)
  File "C:\Users\662176\AppData\Roaming\JetBrains\PyCharm Community Edition 2017.3.3\helpers\pydev\pydevd.py", line 1072, in run
    pydev_imports.execfile(file, globals, locals)  # execute the script
  File "C:\Users\662176\AppData\Roaming\JetBrains\PyCharm Community Edition 2017.3.3\helpers\pydev\_pydev_imps\_pydev_execfile.py", line 18, in execfile
    exec(compile(contents+"\n", file, 'exec'), glob, loc)
  File "C:/Users/662176/Documents/_Projects/_makerfaire/drone_showcase/pose_recognizer/src/webapp/app.py", line 27, in <module>
    consumer_stream_name="poserec_results")
  File "C:\Users\662176\Documents\_Projects\_makerfaire\drone_showcase\pose_recognizer\src\poseapp\poseapp_kinesis.py", line 41, in __init__
    self.kproducer = KinesisProducer(stream_name=produce_stream_name)
  File "C:\Users\662176\Documents\_Projects\_makerfaire\drone_showcase\pose_recognizer\venv\lib\site-packages\kinesis\producer.py", line 142, in __init__
    max_size=max_size, boto3_session=boto3_session)
  File "C:\Users\662176\Documents\_Projects\_makerfaire\drone_showcase\pose_recognizer\venv\lib\site-packages\kinesis\producer.py", line 78, in __init__
    self.start()
  File "C:\Users\662176\Documents\_Projects\_makerfaire\drone_showcase\pose_recognizer\venv\lib\site-packages\offspring\process.py", line 55, in start
    self.process.start()
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\process.py", line 105, in start
    self._popen = self._Popen(self)
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\context.py", line 223, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\context.py", line 322, in _Popen
    return Popen(process_obj)
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\popen_spawn_win32.py", line 65, in __init__
    reduction.dump(process_obj, to_child)
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
AttributeError: Can't pickle local object 'Subprocess.start.<locals>.bootstrap'
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "C:\Users\662176\Documents\_Projects\_makerfaire\drone_showcase\pose_recognizer\venv\lib\site-packages\offspring\process.py", line 75, in atexit
    recursively_shutdown(cls)
  File "C:\Users\662176\Documents\_Projects\_makerfaire\drone_showcase\pose_recognizer\venv\lib\site-packages\offspring\process.py", line 71, in recursively_shutdown
('2018-07-28 23:58:19,957 - offspring.process - MainThread - DEBUG - Shutting down <kinesis.producer.AsyncProducer object at 0x000002746D1E68D0>
    recursively_shutdown(klass)
  File "C:\Users\662176\Documents\_Projects\_makerfaire\drone_showcase\pose_recognizer\venv\lib\site-packages\offspring\process.py", line 74, in recursively_shutdown
    obj.shutdown()
  File "C:\Users\662176\Documents\_Projects\_makerfaire\drone_showcase\pose_recognizer\venv\lib\site-packages\offspring\process.py", line 85, in shutdown
    self.wait()
  File "C:\Users\662176\Documents\_Projects\_makerfaire\drone_showcase\pose_recognizer\venv\lib\site-packages\offspring\process.py", line 89, in wait
    self.process.join()
  File "C:\ProgramData\Anaconda3\lib\multiprocessing\process.py", line 123, in join
    assert self._popen is not None, 'can only join a started process'
AssertionError: can only join a started process

Will greatly appreciated any help.

Use of offpsring eventually leads to too many open files if there are closed shards

While doing some testing of kinesis-python I was evaluating how well it handles live resharding. As old shards remained kinesis-python would spin up a ShardReader and shut it down for each inactive shard. Offspring keeps track of these instances only to cleanup on exit, but they build up overtime and eventually resulting in [Errno 24] Too many open files

Offspring looks dead, but I submitted a pull request that would fix it. borgstrom/offspring#3

Pass-in and override endpoint_url in boto3 Kinesis client and DynamoDB resource

Hi,

We are looking to use this library in our project and would be happy to pick up some of the project maintenance if needed.

However we do need a submit a PR, which adds the possibility to pass in an endpoint_url for both the Kinesis (consumer.KinesisConsumer) and DynamoDB (state.DynamoDB) setup, along with a session. We need this so we can work with Localstack.

I'll put together the PR but just wanted to check that you're open to this beforehand!

Thanks

further explaination needs reader_sleep_time=12

I have a question regarding setting the sleep interval for the subprocesses in the consumer
reader_sleep_time=12
should setting the variable actually slow down the loop (the iter overrided in the class), or does it internally set the sleep interval for the subprocess that is listening the stream?

I could not find anything regarding that in the documentation.
thanks

Producer loop ignores max # of messages per put_records

Looking at the producer's loop, it looks like there is no limit to the number of messages (only size is referenced) per flush.
In case of multiple small messages one might surpass the 500 msgs count and the put_messages would fail.

Have I missed something?

Thanks,
itamar

Support using enhanced fan-out for consumer

Hi! First of all thanks for the work done on this library. Definitely using KCL is not ideal with dependency on java MultiLangDaemon...

Wanted to know if there are plans to use enhanced fan-out for the consumers? I noticed it is using get_records() instead of subscribe_to_shard(). This would have limitation on the throughput of 2mb/s shared among consumers per shard.
https://docs.amazonaws.cn/en_us/streams/latest/dev/building-enhanced-consumers-api.html

Do you have plans to migrate to using enhanced fan-out for the consumers?

ShardReader retry does not back-off

This code is very helpful for understanding how to work with Kinesis Data Streams - thank you!

While sifting through it, I noticed that retries, which controls the ShardReader's back-off is never actually incremented as far as I can tell; this line adds 1 but never saves the incremented value:

log.debug("Retrying get_records (#%d %ds): %s", self.retries+1, loop_status, exc)

This should work:

    self.retries += 1
    log.debug("Retrying get_records (#%d %ds): %s", self.retries, loop_status, exc)

No obvious way to use VPC Interface endpoint

Maybe I'm missing something, but it looks like there's no obvious way to use this library with a VPC interface endpoint for Kinesis streams, to keep traffic internal to a VPC (and avoid the costs of sending the consumer/producer traffic over the public internet).

This would seem to be supported by Boto3's client constructor, so it should be a simple change.

self.kinesis_client = self.boto3_session.client('kinesis', endpoint_url="https://<URL>").

If you think this would be beneficial, am happy to send a PR.

No such file or directory: 'VERSION'

Hi there,

I run the following command pip3 install kinesis-python, but I got the following error:

Collecting kinesis-python
  Downloading kinesis-python-0.0.4.tar.gz
    Complete output from command python setup.py egg_info:
    Traceback (most recent call last):
      File "<string>", line 1, in <module>
      File "/private/var/folders/b0/m1wbw4x14tn78q7vym9sd8c80000gn/T/pip-build-rinms_kk/kinesis-python/setup.py", line 3, in <module>
        with open('VERSION') as version_fd:
    FileNotFoundError: [Errno 2] No such file or directory: 'VERSION'

    ----------------------------------------
Command "python setup.py egg_info" failed with error code 1 in /private/var/folders/b0/m1wbw4x14tn78q7vym9sd8c80000gn/T/pip-build-rinms_kk/kinesis-python/

I guess the code hasn't been updated to the latest version yet.

Cheers.

No module named state

Hello,
Please note this

from kinesis.consumer import KinesisConsumer
from kinesis.state import DynamoDB
Traceback (most recent call last):
File "", line 1, in
ImportError: No module named state

Regards,

Missing handling of unsuccessfully processed records

self.client.put_records(

By default, failure of individual records within a request does not stop the processing of subsequent records in a boto3 kinesis client put_records() request. This means that a returned records array from put_records() includes both successfully and unsuccessfully processed records. This library is missing detection of unsuccessfully processed records and handling in a subsequent call.

Custom checkpointing

Do I understand it correctly that the code checkpoints as soon as an element is yielded? (

self.state.checkpoint(state_shard_id, item['SequenceNumber'])
)

The issue I have with this behaviour is that yielding an item does not imply it has been processed (whatever that means for me). So if my code fails after I've received a record but before I have processed it, it will be marked as processed in DynamoDB, so I will lose this piece of information.

Is my understanding correct? If so, do you think there should be a mechanism to manually checkpoint data? I don't think it's difficult to code up, it's just tricky in terms of API - since you're making shards transparent, it's not quite clear which shard can be checkpointed at what time.

Ability to specify partition key

Hi! Thank you for your hard work. I was curious if you are planning on introducing the ability to specify the partition keys upon put record?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.