Giter Club home page Giter Club logo

Comments (16)

twmb avatar twmb commented on August 24, 2024

from franz-go.

vtolstov avatar vtolstov commented on August 24, 2024

my use case to get slice of records that can be processed in parallel - because offset of each partition not depends on another.

from franz-go.

twmb avatar twmb commented on August 24, 2024

The point is that each record in the sliced can be processed in parallel?
If so, this seems like a very slight variation on fetches.EachPartition. Is that function insufficient?

from franz-go.

vtolstov avatar vtolstov commented on August 24, 2024

mostly i don't understand how to use it (EachPartition) to create slice of messages for distinct partition =)
Also problem if some batch fetch does not have equal number of records for each partition

from franz-go.

twmb avatar twmb commented on August 24, 2024

Do you aim to have one (1) record per partition?

So let's say the client has a bunch of records ready for some partitions, some partitions with no records, a total of 10 partitions:
8 records for partition 0,
9 for partition 1,
0 for partition 2,
6 for 3
1 for 4
0 for 5
0 for 6
30 for 7
3 for 8,
9 for 9

Do you want only one record from each of the partitions? So you would have a slice of:

1 record for each of partitions 0, 1, 3, 4, 7, 8, 9, for a total of 7 records total?


Right now, with EachPartition, it'd be basically like...

fetches.EachPartition(func(p FetchTopicPartition) {
   doSomethingWithDistinctPartition(p.Records)
})

but, as you note, p.Records may (and likely will) have an unequal amount of records per partition.

from franz-go.

vtolstov avatar vtolstov commented on August 24, 2024

yes , i need for this case = 1 record for each of partitions 0, 1, 3, 4, 7, 8, 9, for a total of 7 records total

from franz-go.

vtolstov avatar vtolstov commented on August 24, 2024

on next iteration i want to have another slice for distinct partitions and the same for the next, so after all iteration all messages from this batch are processed i'm read next batch

from franz-go.

twmb avatar twmb commented on August 24, 2024

I think this is a very inefficient way of processing messages -- what's the use case? I have an idea that may be viable to solve this, but I think I need to know a compelling use case before adding it.

from franz-go.

vtolstov avatar vtolstov commented on August 24, 2024

use-case parallel processing each partition messages
if i'm return slice of messages that contains more than one message in partition and first have error in processing and the second is ok, i'm commit offset of the last message and have lost one event

from franz-go.

twmb avatar twmb commented on August 24, 2024

Is it not possible to process the first record of each partition, then commit that, then process the second, etc.? This would allow for the same concurrency, no?

from franz-go.

twmb avatar twmb commented on August 24, 2024

It may be worth it to hop on the discord for a high bandwidth way to ask questions about this, but continuing on this github thread is fine too!

from franz-go.

vtolstov avatar vtolstov commented on August 24, 2024

processing first of each partition in parallel is fine, but as i use this in framework i'm already need to pass slice of records for example first records of each partition, after they processed - pass second records of each partition

from franz-go.

twmb avatar twmb commented on August 24, 2024

Would it be possible for you to create a helper function, application side, such as:

// FirstPartitionRecords returns the first record from all partitions in all fetches,
// and returns what remains in the fetches.
func FirstPartitionRecords(fs kgo.Fetches) (kgo.Fetches, []*kgo.Record) {
  // todo
}

and then use this to drain the fetches one partition at a time? I'm still not sure how this is needed client side.

That said, I'm going to toy around with two ideas this week:

type PollFuncChoice uint8

const (
  // PollFuncKeep keeps a record, returning it in fetches when PollFunc returns.
  PollFuncKeep PollFuncChoice = iota

  // PollFuncDiscard discards a record, advancing the client past it.
  PollFuncDiscard

  // PollFuncDiscardPartition discards the rest of the records in the current partition.
  PollFuncDiscardPartition

  // PollFuncDiscardTopic discards the rest of the records in the current topic.
  PollFuncDiscardTopic

  // PollFuncSkipPartition skips processing the current partition; all un-processed records
  // in this partition are candidates for the next poll.
  PollFuncSkipPartition

  // PollFuncSkipTopic skips processing the current topic; all un-processed records are candidates
  // for the next poll.
  PollFuncSkipTopic

  // PollFuncKeepThenSkipPartition keeps the current record, then skips processing the rest
  // of the current partition. All unprocessed records in this partition after this kept record are
  // candidates for the next poll.
  PollFuncKeepThenSkipPartition

  // PollFuncKeepThenSkipTopic keeps the current record, then skips processing the rest
  // of the current topic. All unprocessed records in this topic after this kept record are
  // candidates for the next poll.
  PollFuncKeepThenSkipTopic
)

// PollFunc waits for records to be available, and then uses fn to determine which
// records to return, which to discard, and which to skip. If the context quits,
// this function quits. If the context if nil or already canceled, this function
// will return immediately after evaluating any currently buffered records.
//
// It is important to check all partition errors,... etc. doc copied from PollFetches.
func (cl *Client) PollFunc(ctx context.Context, fn func(*Record) PollFuncChoice) Fetches {
  // todo
}

// FlattenRecords flattens all records in all fetches into one slice.
func (fs Fetches) FlattenRecords() []*Record {
 // todo
}

The combo of PollFunc and FlattenRecords would allow you to do exactly what you need. But, I want to see the API a bit more to see if this would be a decent addition -- this function could also theoretically replace PollRecords, but I'd keep PollRecords for a more succinct choice. PollFunc is basically super-user-option.

from franz-go.

twmb avatar twmb commented on August 24, 2024

I specced out that API, the number of constants gets quite large and the use case is probably pretty confusing to include.

What do you think about polling all fetches, then draining your local poll with the FirstPartitionRecords I mentioned in my last function?

You would then pass the first record of each partition to your function, then commit each first record with CommitRecords, etc. This is basically the same thing that the client itself would do.

from franz-go.

vtolstov avatar vtolstov commented on August 24, 2024

yes, i don't think that you need to explode code by adding such func, i write helper func and get user ability to return all records from batch via slice or multidimensional slice like [][]*Record that partitioned by partition =)

from franz-go.

twmb avatar twmb commented on August 24, 2024

Cool, then I'l close this!

For some posterity, and just in case I need to revisit this in the future, the API I specced out was:

diff --git a/pkg/kgo/consumer.go b/pkg/kgo/consumer.go
index f106cfe..45d744b 100644
--- a/pkg/kgo/consumer.go
+++ b/pkg/kgo/consumer.go
@@ -376,6 +376,117 @@ func (cl *Client) PollRecords(ctx context.Context, maxPollRecords int) Fetches {
 	return fetches
 }
 
+// PollFuncResult indicates how the client should process handling the
+// current record passed to your poll func.
+type PollFuncResult uint8
+
+const (
+	// PollFuncKeep keeps the current record.
+	PollFuncKeep PollFuncResult = iota
+
+	// PollFuncKeepPartition keeps the current record and the rest of the
+	// buffered records in the current record's partition.
+	PollFuncKeepPartition
+
+	// PollFuncKeepTopic keeps the current record and the rest of the
+	// buffered records in the current record's topic.
+	PollFuncKeepTopic
+
+	// PollFuncKeepRemaining keeps the current record, the rest of the
+	// buffered records, immediately returns from PollFunc.
+	PollFuncKeepRemaining
+
+	// PollFuncDiscard discards the current record.
+	PollFuncDiscard
+
+	// PollFuncDiscardPartition discards the current record and the rest of
+	// the buffered records in the current record's partition.
+	PollFuncDiscardPartition
+
+	// PollFuncDiscardTopic discards the current record and the rest of the
+	// buffered records in the current record's topic.
+	PollFuncDiscardTopic
+
+	// PollFuncDiscardRemaining discards the current record and the rest of
+	// the buffered records and immediately returns from PollFunc.
+	PollFuncDiscardRemaining
+
+	// PollFuncSkipPartition skips the current record and the rest of the
+	// buffered records in the current record's partition. These records
+	// will be available the next time any poll function is called.
+	//
+	// There is no PollFuncSkip because, for offset consistency, if a
+	// single record is skipped, the rest of the partition must be skipped.
+	PollFuncSkipPartition
+
+	// PollFuncSkipTopic skips the current record and rest of the buffered
+	// records in the current record's topic. These records will be
+	// available the next time any poll function is called.
+	PollFuncSkipTopic
+
+	// PollFuncSkipRemaining skips the current record, the rest of the
+	// buffered records, and immediately returns from PollFunc. All skipped
+	// records will be available the next time any poll function is called.
+	PollFuncSkipRemaining
+
+	// PollFuncKeepThenDiscardPartition keeps the current record and then
+	// discards the rest of the buffered records for the current record's
+	// partition.
+	PollFuncKeepThenDiscardPartition
+
+	// PollFuncKeepThenDiscardPartition keeps the current record and then
+	// discards the rest of the buffered records for the current record's
+	// topic.
+	PollFuncKeepThenDiscardTopic
+
+	// PollFuncKeepThenDiscardPartition keeps the current record, discards
+	// the rest of the buffered records, and immediately returns from
+	// PollFunc.
+	PollFuncKeepThenDiscardRemaining
+
+	// PollFuncKeepThenSkipPartition keeps the current record and then
+	// skips the rest of the buffered records for the current record's
+	// partition. These records will be available the next time any poll
+	// function is called.
+	PollFuncKeepThenSkipPartition
+
+	// PollFuncKeepThenSkipTopic keeps the current record and then skips
+	// the rest of the buffered records for the current record's topic.
+	// These records will be available the next time any poll function is
+	// called.
+	PollFuncKeepThenSkipTopic
+
+	// PollFuncKeepThenSkipRemaining keeps the current record, skips the
+	// rest of the buffered records, and immediately returns from PollFunc.
+	// All skipped records will be available the next time any poll
+	// function is called.
+	PollFuncKeepThenSkipRemaining
+)
+
+// PollFunc is an advanced method for polling records. It is similar to
+// PollFetches or PollRecords, but allows for custom polling to determine which
+// records are returned (of all buffered records), as well as determining how
+// many records to return, and whether some records should be discarded
+// entirely without being returned.
+//
+// This function waits for fetches to be available, and then calls fn for every
+// record in all currently buffered fetches. Once each record is visited (or
+// skipped or discarded), this function returns. If the context quits before
+// any fetch is available, this function quits. If the context is nil or
+// already canceled, this function will run immediately against any currently
+// buffered fetches and will not wait if there are no buffered fetches.
+//
+// It is important to check all partition errors in the returned fetches. If
+// any partition has a fatal error and actually had no records, fake fetch will
+// be injected with the error.
+//
+// If the client is closing or has closed, a fake fetch will be injected that
+// has no topic, a partition of 0, and a partition error of ErrClientClosed.
+// This can be used to detect if the client is closing and to break out of a
+// poll loop.
+func (cl *Client) PollFunc(ctx context.Context, fn func(*Record) PollFuncResult) Fetches {
+}
+

from franz-go.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.