Comments (8)
Note that in hard-coding offsets of 0 doesn't work well. You will see exceptions like:
kafka.common.OffsetOutOfRangeException: Request for offset 0 but we only have log segments in the range 39943 to 11471647.
at kafka.log.Log.read(Log.scala:429)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:382)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:327)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:323)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
at scala.collection.immutable.Map$Map1.map(Map.scala:93)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:323)
at kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:573)
at kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:555)
at kafka.server.RequestPurgatory$ExpiredRequestReaper.run(RequestPurgatory.scala:216)
at java.lang.Thread.run(Thread.java:724)
from kafka-python.
Hi @jcrobak,
it is commented on purpose and these lines relate to the version of Kafka that will support offsets commit to brokers. This features is not available in 0.8.0 which should be released very shortly.
The error you are getting in the second comment is because Kafka has rolled (deleted) the file which contained the offset you are trying to fetch. Did you get this error after using seek(0, 0)
?
If you look at parameters for seek, using seek(0, 0)
should set the offsets to the earliest (oldest) message in kafka. It is the right thing to do but you need to be ready to handle the error by adjusting the offsets accordingly. Otherwise, I think you'll get stuck there and not consume anything.
If reprocessing messages is a problem for you, the best approach is to store (in zookeeper, or your favorite data store) the offsets where your consumer is at and use it the next time you restart the consumer. Kafka-python does not provide this functionality, partly because the goal for Kafka is to eventually support this instead of relying on a third party such has zookeeper. The commented lines you referred to are meant exactly to support this, whenever it becomes available.
IMHO, any consumer using Kafka needs to have checks for duplicate for robustness sake.
from kafka-python.
Thanks for the response @mrtheb. I have a few follow-up questions...
it is commented on purpose and these lines relate to the version of Kafka that will support offsets commit to brokers. This features is not available in 0.8.0 which should be released very shortly.
Is the feature available in Apache Kafka trunk, is that why this code exists? If that's the case, it seems like there should be a version check, and if it's < 0.8.1, then offset should be determined via seek(0, 0)
rather than hard-coding to 0
, which will not always work.
The error you are getting in the second comment is because Kafka has rolled (deleted) the file which contained the offset you are trying to fetch. Did you get this error after using seek(0, 0)?
As mentioned above, this seems like a bug. But it'd be a change in behavior to call seek(0, 0)
rather than hard-coding to 0
. Thoughts on if that's reasonable or not? At least there should be a big warning in the docstring.
If reprocessing messages is a problem for you, the best approach is to store (in zookeeper, or your favorite data store) the offsets where your consumer is at and use it the next time you restart the consumer. Kafka-python does not provide this functionality
Isn't the storing of offsets implemented in Consumer.commit()
? Do I just need to bypass kafka to read out the stored offsets? I'm not seeing this information being persisted in zookeeper, but maybe I'm doing something wrong. Or is the docstring for commit
wrong?
IMHO, any consumer using Kafka needs to have checks for duplicate for robustness sake.
Definitely, in my case it's more a matter of speed/efficiency.
from kafka-python.
OK, I see now that this is not implemented on the kafka side per the docs: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit%2FFetchAPI
It'd be great if the docs for kafka-python were updated. I can put together a PR.
from kafka-python.
My turn!
Is the feature available in Apache Kafka trunk, is that why this code exists?
Yes, you can see in the Kafka trunk that there is support for an OffsetCommitRequest. However, the library is pinned with the Kafka release and 0.8.0 (just released, yeah!) doesn't support this yet.
Isn't the storing of offsets implemented in Consumer.commit()?
This really needs to be made clear on the front page. This method will work only when used with Kafka trunk that supports this, otherwise, it will fail. I haven't tried personally since I am using Zookeeper to store the offsets. I am doing this outside the library. Go ahead with the PR if you can.
I also think it is a bug if you perform seek(0, 0) and following calls to fetch data returned the error you have.
Thoughts on calling seek implicitly? It is just an opinion but, if it were mapped directly on the Kafka clients, there would be a ZookeeperConsumer where this would make sense. kafka-python is halfway between the SimpleConsumer and ZookeeperConsumer in the sense that it does more the the first but less than the latter. I am not against the idea but I think that doing seek implicitly would require leaving some control to the caller to modify the offsets. This is one of the areas where the Kafka ZookeeperConsumer was also criticized. You can find some info here. For this reason, I believe it is a better option to leave it out, even if once OffsetCommit is fully supported.
from kafka-python.
I haven't tried personally since I am using Zookeeper to store the offsets. I am doing this outside the library.
how are you storing data in zookeeper? Are you doing something like #38?
from kafka-python.
Not exactly, I did some custom work on top of the client before this PR appeared and I have to admit I didn't have the courage to make it generic enough and apply the recipe to kafka-python.
Also, as far as I could see, #38 only implements rebalance and not offset commit in zookeeper. Still, since kazoo is integrated in this fork, you could reuse it and just override commit
to save the offsets in ZK as well. I use the same Zookeeper structure as the ZookeeperConsumerConnector does in the Java/Scala client.
from kafka-python.
I think this issue is resolved
from kafka-python.
Related Issues (20)
- Drop support for EOLed Python versions HOT 7
- Question about group.instance.id
- Support for Python versions >=3.9 HOT 27
- An exception occurs when the ConsumerCoordinator object is being deleted HOT 1
- When will be release on PyPi? Waiting for 3.12 support. HOT 1
- Add list of topics/consumers method to get consumers with the specific topic
- "import kafka" fails with "ModuleNotFoundError: No module named 'kafka.vendor.six.moves'" under Python 3.12 HOT 10
- Manual Offset Commit/Heartbeat Deadlock HOT 1
- Misdetection of xerial snappy on RedPanda
- One of the `_retrieve_offsets` calls is missing the `timeout_ms` variable - this leads to an infinite while loop
- Long running processes causing Heartbeat session expirations HOT 2
- Advice needed. How to connect to Kafka with Private Key Certificate
- Can the Private Key and Password be stored in memory instead of a file?
- Trying to Reuse Kafka Instance Fails
- Messages exceeding the size are silently dropped HOT 1
- invalid literal for int() with base 10: 'A.B.C.D' HOT 1
- MAINTAINER NOTICE: RELEASES ARE CURRENTLY PAUSED HERE, VISIT THE FORK HOT 3
- ReadTheDocs builds currently broken
- Kafka-python errore HOT 1
- Kafka memory error HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from kafka-python.