Giter Club home page Giter Club logo

Comments (15)

horkhe avatar horkhe commented on July 25, 2024

The first time kafka-pixy gets a message from kafka it initialises the consumer group partition offset to the head, so whatever is already in the partition won't be read by the following consume requests. Try making one consume request first, it will return the long polling timeout error, but its purpose is to initialise the consumer group offsets. Then insert messages and then run the loop.

from kafka-pixy.

drjuarez avatar drjuarez commented on July 25, 2024

Thanks for your response @horkhe. So does your statement imply that if we want to use this in a system that generates messages before the proxy is running, we will always drop the first message upon consumption?

from kafka-pixy.

horkhe avatar horkhe commented on July 25, 2024

One more time: when kafka-pixy get a request to consume from a topic on behalf of a consumer group for the first time it initialises offsets to the head of all topic partitions. So whatever is already in the partitions won't be consumed. All messages produced to the topic after that will be consumed. However if you stop consuming for a period greater than the retention configured for the __consumer_offsets system topic, then consumer group offsets stored in kafka will be expired and removed by kafka. So the following consume request will trigger reinitialisation of topic offsets to the head of all partitions.

from kafka-pixy.

drjuarez avatar drjuarez commented on July 25, 2024

Thanks for helping me understand

from kafka-pixy.

drjuarez avatar drjuarez commented on July 25, 2024

The behaviour I am seeing on my end doesn't really line up with what you are saying though. Starting from a freshly created topic if I generate 5 messages and then start the ConsumeLoop, the consumer group partition is initialised (inconsistently) at either 0 or 1.

After initialisation and catch up to head, the consumer group offset works as expected.

If I understand you correctly, consumer group initialisation will set the offset to latest.. meaning I should not read anything in at all I suppose.

For reference, I am using Sarama and relying on the auto topic generation upon first message creation. I have confirmed all of my messages arrive in the freshly created topic using kafkacat

from kafka-pixy.

horkhe avatar horkhe commented on July 25, 2024

kafka-pixy expects a consumed topic to be present. I am not sure how creating topics on the fly is going to affect the result. Probably the behaviour is a race between kafka creating a topic and kafka-pixy initialising offsets for the topic being created. Just create the topic explicitly beforehand to achieve predictable behavor.

from kafka-pixy.

timbunce avatar timbunce commented on July 25, 2024

We've encountered an issue that I suspect is the same as this and we have some more details to share.

Summary of our test:

  • (the topic already exists, the topic has a single partition, the proxy is already running)
  • consume any messages waiting on the topic till idle for 5s (draining old messages)
  • call Produce 3 times to write three messages to the topic (Async: false)
  • consume any messages waiting on the topic till idle for 5s
  • expect to consume the same messages that were produced

What we're seeing is that some of the messages produced don't get consumed. They seem to be 'lost'. Sometime later, some but not all of those 'lost' messages reappear. Later still the remaining messages will appear.

Here's an annotated log showing the effect: pixy-lost-msgs-issue.log

Here's the (fairly generic?) config file: krproxyconfig.yaml.txt - the kafka and zookeeper are running locally in a container in this test setup.

It seems clear that there's some unexpected buffering happening. I'm not very familiar with Kafka or Pixy and this could easily be pilot error correctable by tuning the config or pointing out where we're being silly.

Side note, in case it'd relevant: this test sends writes (produces) through the proxy. The system we're developing wouldn't do that. The pixy proxy would only be used for reads. Perhaps that would avoid the problem all together?

from kafka-pixy.

timbunce avatar timbunce commented on July 25, 2024

This is looking more like a pixy bug related to buffering on the reading (not writing) side.

Here's a sequence of runs of the test which shows a common behavior where the first message of several doesn't get returned, and now shows the message offsets:

09:58 $ go test ./krproxy/ -count 1 -v
    krproxy_test.go:71: put "1@09:58:47" - offset 186
    krproxy_test.go:71: put "2@09:58:47" - offset 187
    krproxy_test.go:32: got "2@09:58:47" - offset 187
    krproxy_test.go:71: put "3@09:58:47" - offset 188
    krproxy_test.go:32: got "3@09:58:47" - offset 188
    krproxy_test.go:57: expected [1@09:58:47 2@09:58:47 3@09:58:47] got [2@09:58:47 3@09:58:47]

09:59 $ go test ./krproxy/ -count 1 -v
    krproxy_test.go:71: put "1@09:59:18" - offset 189
    krproxy_test.go:71: put "2@09:59:18" - offset 190
    krproxy_test.go:32: got "2@09:59:18" - offset 190
    krproxy_test.go:71: put "3@09:59:18" - offset 191
    krproxy_test.go:32: got "3@09:59:18" - offset 191
    krproxy_test.go:57: expected [1@09:59:18 2@09:59:18 3@09:59:18] got [2@09:59:18 3@09:59:18]

10:01 $ go test ./krproxy/ -count 1 -v
    krproxy_test.go:73: put "1@10:01:22" - offset 192
    krproxy_test.go:32: got "2@10:01:22" - offset 193
    krproxy_test.go:73: put "2@10:01:22" - offset 193
    krproxy_test.go:73: put "3@10:01:22" - offset 194
    krproxy_test.go:32: got "3@10:01:22" - offset 194
    krproxy_test.go:57: expected [1@10:01:22 2@10:01:22 3@10:01:22] got [2@10:01:22 3@10:01:22]

The pixy log had no new entries during those runs.
At this point I waited a while.
Then the pixy log shows:

2020-03-27 10:02:09.505467 Z warning </default.0/cons.0/ussjc-bx-001.ts.example.com.2/cachectl-purge-bx.p0.0> "Retrying: retryNo=1, offset=180, key=dummy" kafka.group=ussjc-bx-001.ts.example.com kafka.partition=0 kafka.topic=cachectl-purge-bx
2020-03-27 10:02:26.391817 Z info </default.0/cons.0/ussjc-bx-001.ts.example.com.2/cachectl-purge-bx.0> "Topic subscription expired" kafka.group=ussjc-bx-001.ts.example.com kafka.topic=cachectl-purge-bx

Then I reran the test. This time it read a message that had been written at a much earlier offset:

10:02 $ go test ./krproxy/ -count 1 -v
    krproxy_test.go:32: got "1@09:57:28" - offset 183   <== message delayed for minutes
    krproxy_test.go:42: drained 1 old messages
    krproxy_test.go:73: put "1@10:02:32" - offset 195
    krproxy_test.go:32: got "1@10:02:32" - offset 195
    krproxy_test.go:73: put "2@10:02:32" - offset 196
    krproxy_test.go:73: put "3@10:02:32" - offset 197
    krproxy_test.go:32: got "2@10:02:32" - offset 196
    krproxy_test.go:32: got "3@10:02:32" - offset 197

The pixy log had a couple of extra lines:

2020-03-27 10:02:31.505569 Z warning </default.0/cons.0/ussjc-bx-001.ts.example.com.2/cachectl-purge-bx.p0.0> "Retrying: retryNo=1, offset=183, key=dummy" kafka.group=ussjc-bx-001.ts.example.com kafka.partition=0 kafka.topic=cachectl-purge-bx
2020-03-27 10:02:32.772159 Z info </default.0/cons.0/ussjc-bx-001.ts.example.com.2/cachectl-purge-bx.0> "Resume request handling" kafka.group=ussjc-bx-001.ts.example.com kafka.topic=cachectl-purge-bx

Several minutes later, after writing all this up, I reran the test and all the old messages arrived:

10:23 $ go test ./krproxy/ -count 1 -v
    krproxy_test.go:32: got "1@09:57:06" - offset 180
    krproxy_test.go:32: got "1@09:58:47" - offset 186
    krproxy_test.go:32: got "1@09:59:18" - offset 189
    krproxy_test.go:32: got "1@10:01:22" - offset 192
    krproxy_test.go:42: drained 4 old messages
    krproxy_test.go:32: got "1@10:23:25" - offset 198
    krproxy_test.go:73: put "1@10:23:25" - offset 198
    krproxy_test.go:73: put "2@10:23:25" - offset 199
    krproxy_test.go:73: put "3@10:23:25" - offset 200
    krproxy_test.go:32: got "2@10:23:25" - offset 199
    krproxy_test.go:32: got "3@10:23:25" - offset 200
    krproxy_test.go:59: TestRoundtrip done
PASS

So it seems that some part of pixy is reading some messages and holding on to them for some time while other messages flow freely, before then releasing them later.

from kafka-pixy.

timbunce avatar timbunce commented on July 25, 2024

Random observations...

  • the config proxies.default.kafka.version was 0.10.2.1 (the default).
  • Our kafka is 2.2.1. The config file says "Supported versions are 0.10.2.1 - 2.0.0".
  • Setting ...kafka.version to 2.0.0 there's no apparent change in behaviour.
  • Pixy hangs on shutdown (has to be killed) after even a single successful test run.

from kafka-pixy.

horkhe avatar horkhe commented on July 25, 2024

You have long_polling_timeout: 72h this is way too much. You should have it set to a few seconds. Your consumer will periodically get gRPS status 5 or HTTP 404 Not Found (depending on the client you are using) and handle it. Otherwise your application will likely timeout on its own internal request timeout, probably resetting the connection. Consumed messages can be lost during those resets.

from kafka-pixy.

timbunce avatar timbunce commented on July 25, 2024

You have long_polling_timeout: 72h this is way too much. You should have it set to a few seconds.

Interesting, thanks. A different developer did the initial work. I'd presumed the config file was generic.

But that can't be the cause of the behaviour as my recent testing has used no config file at all. The test starts the proxy itself, passing just the needed -*Addr and -*Peers flags, then

I've noticed something I think is relevant... it seems the lost messages correlate with there being more than one "Fetched subscriptions" reported in the log:

2020-03-27 17:19:30.997636 Z info </_.0/cons.0/ussjc-bx-001.ts.example.com.0/member.0> "Fetched subscriptions: {
    kp_TimBunceA2.local_89648: [cachectl-purge-bx]
    kp_TimBunceA2.local_89679: [cachectl-purge-bx]
}" kafka.group=ussjc-bx-001.ts.example.com

So I'd guess that the 'delayed' messages were read by a previous subscription, so aren't read by the new subscription. Then, when the previous subscription times out the old messages are 'released'. Does that sound about right?

Would a short consumer.subscription_timeout setting be the right approach?

from kafka-pixy.

horkhe avatar horkhe commented on July 25, 2024

So you have two kafka-pixy instances running in your test kp_TimBunceA2.local_89648 and kp_TimBunceA2.local_89679. And they compete for the single partition.

from kafka-pixy.

timbunce avatar timbunce commented on July 25, 2024

That makes sense. We don't have two running but have been starting and stopping kafka-pixy on each test run. We found it doesn't terminate so it's been killed. See #183. (Note that the initial symptoms were encountered with one long-running instance of kafka-pixy so I'm doubtful this is the whole story.) I'll change the test to leave kafka-pixy running and get back to you. Thanks.

from kafka-pixy.

horkhe avatar horkhe commented on July 25, 2024

I am 100% confident that there is no problem with kafka-pixy it has been in production on a very large scale for a long time. It is most likely the problem with your configuration, or your client. Kafka-pixy comes with testconsumer and testproducer scripts. I have just modified them to add verbose flag so they can produce results similar to your test app. Could you please use them in your tests.

EDIT:
clone the repo and do go install ./..., that will add both scripts to your GOPATH/bin.

from kafka-pixy.

timbunce avatar timbunce commented on July 25, 2024

Thanks for the updates @horkhe.

I can't reproduce the problem using the testconsumer and testproducer scripts. I also can't reproduce it with our test script with our current client code and a long-lived kafka-pixy process.

I'm happy to put our issues down to pilot error. Feel free to close the case, unless @drjuarez has any further concerns.

Thank you for your patience.

from kafka-pixy.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.