Giter Club home page Giter Club logo

Comments (8)

jcrobak avatar jcrobak commented on July 23, 2024

Note that in hard-coding offsets of 0 doesn't work well. You will see exceptions like:

kafka.common.OffsetOutOfRangeException: Request for offset 0 but we only have log segments in the range 39943 to 11471647.
        at kafka.log.Log.read(Log.scala:429)
        at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:382)
        at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:327)
        at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:323)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
        at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
        at scala.collection.immutable.Map$Map1.map(Map.scala:93)
        at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:323)
        at kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:573)
        at kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:555)
        at kafka.server.RequestPurgatory$ExpiredRequestReaper.run(RequestPurgatory.scala:216)
        at java.lang.Thread.run(Thread.java:724)

from kafka-python.

mrtheb avatar mrtheb commented on July 23, 2024

Hi @jcrobak,

it is commented on purpose and these lines relate to the version of Kafka that will support offsets commit to brokers. This features is not available in 0.8.0 which should be released very shortly.

The error you are getting in the second comment is because Kafka has rolled (deleted) the file which contained the offset you are trying to fetch. Did you get this error after using seek(0, 0)?

If you look at parameters for seek, using seek(0, 0) should set the offsets to the earliest (oldest) message in kafka. It is the right thing to do but you need to be ready to handle the error by adjusting the offsets accordingly. Otherwise, I think you'll get stuck there and not consume anything.

If reprocessing messages is a problem for you, the best approach is to store (in zookeeper, or your favorite data store) the offsets where your consumer is at and use it the next time you restart the consumer. Kafka-python does not provide this functionality, partly because the goal for Kafka is to eventually support this instead of relying on a third party such has zookeeper. The commented lines you referred to are meant exactly to support this, whenever it becomes available.

IMHO, any consumer using Kafka needs to have checks for duplicate for robustness sake.

from kafka-python.

jcrobak avatar jcrobak commented on July 23, 2024

Thanks for the response @mrtheb. I have a few follow-up questions...

it is commented on purpose and these lines relate to the version of Kafka that will support offsets commit to brokers. This features is not available in 0.8.0 which should be released very shortly.

Is the feature available in Apache Kafka trunk, is that why this code exists? If that's the case, it seems like there should be a version check, and if it's < 0.8.1, then offset should be determined via seek(0, 0) rather than hard-coding to 0, which will not always work.

The error you are getting in the second comment is because Kafka has rolled (deleted) the file which contained the offset you are trying to fetch. Did you get this error after using seek(0, 0)?

As mentioned above, this seems like a bug. But it'd be a change in behavior to call seek(0, 0) rather than hard-coding to 0. Thoughts on if that's reasonable or not? At least there should be a big warning in the docstring.

If reprocessing messages is a problem for you, the best approach is to store (in zookeeper, or your favorite data store) the offsets where your consumer is at and use it the next time you restart the consumer. Kafka-python does not provide this functionality

Isn't the storing of offsets implemented in Consumer.commit()? Do I just need to bypass kafka to read out the stored offsets? I'm not seeing this information being persisted in zookeeper, but maybe I'm doing something wrong. Or is the docstring for commit wrong?

IMHO, any consumer using Kafka needs to have checks for duplicate for robustness sake.

Definitely, in my case it's more a matter of speed/efficiency.

from kafka-python.

jcrobak avatar jcrobak commented on July 23, 2024

OK, I see now that this is not implemented on the kafka side per the docs: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit%2FFetchAPI

It'd be great if the docs for kafka-python were updated. I can put together a PR.

from kafka-python.

mrtheb avatar mrtheb commented on July 23, 2024

My turn!

Is the feature available in Apache Kafka trunk, is that why this code exists?

Yes, you can see in the Kafka trunk that there is support for an OffsetCommitRequest. However, the library is pinned with the Kafka release and 0.8.0 (just released, yeah!) doesn't support this yet.

Isn't the storing of offsets implemented in Consumer.commit()?

This really needs to be made clear on the front page. This method will work only when used with Kafka trunk that supports this, otherwise, it will fail. I haven't tried personally since I am using Zookeeper to store the offsets. I am doing this outside the library. Go ahead with the PR if you can.

I also think it is a bug if you perform seek(0, 0) and following calls to fetch data returned the error you have.

Thoughts on calling seek implicitly? It is just an opinion but, if it were mapped directly on the Kafka clients, there would be a ZookeeperConsumer where this would make sense. kafka-python is halfway between the SimpleConsumer and ZookeeperConsumer in the sense that it does more the the first but less than the latter. I am not against the idea but I think that doing seek implicitly would require leaving some control to the caller to modify the offsets. This is one of the areas where the Kafka ZookeeperConsumer was also criticized. You can find some info here. For this reason, I believe it is a better option to leave it out, even if once OffsetCommit is fully supported.

from kafka-python.

jcrobak avatar jcrobak commented on July 23, 2024

I haven't tried personally since I am using Zookeeper to store the offsets. I am doing this outside the library.

how are you storing data in zookeeper? Are you doing something like #38?

from kafka-python.

mrtheb avatar mrtheb commented on July 23, 2024

Not exactly, I did some custom work on top of the client before this PR appeared and I have to admit I didn't have the courage to make it generic enough and apply the recipe to kafka-python.

Also, as far as I could see, #38 only implements rebalance and not offset commit in zookeeper. Still, since kazoo is integrated in this fork, you could reuse it and just override commit to save the offsets in ZK as well. I use the same Zookeeper structure as the ZookeeperConsumerConnector does in the Java/Scala client.

from kafka-python.

rdiomar avatar rdiomar commented on July 23, 2024

I think this issue is resolved

from kafka-python.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.