Giter Club home page Giter Club logo

onsdigital.dp-kafka's Introduction

dp-kafka

Kafka client wrapper using channels to abstract kafka consumers and producers. This library is built on top of Sarama

Life-cycle

Creation

Kafka producers and consumers can be created with constructors that accept the required channels and configuration. You may create the channels using CreateProducerChannels and CreateConsumerChannels respectively:

	// Create Producer with channels and config
	pChannels := kafka.CreateProducerChannels()
	pConfig := &kafka.ProducerConfig{MaxMessageBytes: &cfg.KafkaMaxBytes}
	producer, err := kafka.NewProducer(ctx, cfg.Brokers, cfg.ProducedTopic, pChannels, pConfig)
	// Create ConsumerGroup with channels and config
	cgChannels := kafka.CreateConsumerGroupChannels(cfg.KafkaParallelMessages)
	cgConfig := &kafka.ConsumerGroupConfig{KafkaVersion: &cfg.KafkaVersion}
	cg, err := kafka.NewConsumerGroup(ctx, cfg.Brokers, cfg.ConsumedTopic, cfg.ConsumedGroup, cgChannels, cgConfig)

For consumers, you can specify the batch size that determines the number of messages to be stored in the Upstream channel. It is recommended to provide a batch size equal to the number of parallel messages that are consumed.

You can provide an optional config parameter to the constructor (ProducerConfig and ConsumerGroupConfig). Any provided configuration will overwrite the default sarama config, or you can pass a nil value to use the default sarama config.

The constructor tries to initialise the producer/consumer by creating the underlying Sarama client, but failing to initialise it is not considered a fatal error, hence the constructor will not error.

please, note that if you do not provide the necessary channels, an ErrNoChannel error will be returned by the constructors, which must be considered fatal.

Initialisation

If the producer/consumer can establish a connection with the Kafka cluster, it will be initialised at creation time, which is usually the case. But it might not be able to do so, for example if the kafka cluster is not running. If a producer/consumer is not initialised, it cannot contact the kafka broker, and it cannot send or receive any message. Any attempt to send a message in this state will result in an error being sent to the Errors channel.

An uninitialised producer/consumer will try to initialise later, asynchronously, in a retry loop following an exponential backoff strategy. You may also try to initialise it calling Initialise(). In any case, when the initialisation succeeds, the initialisation loop will exit, and it will start producing/consuming.

You can check if a producer/consumer is initialised by calling IsInitialised() or wait for it to be initialised by waiting for the Ready channel to be closed, like so:

	// wait in a parallel go-routine
	go func() {
		<-channels.Ready
		doStuff()
	}()
	// block until kafka is initialised
	<-channels.Ready
	doStuff()

Waiting for this channel is a convenient hook, but not a necessary requirement.

Message production

Messages are sent to Kafka by sending them to a producer Output channel, as byte arrays:

	// send message
	pChannels.Output <- []byte(msg)

Message consumption

Messages can be consumed by creating an infinite consumption loop. Once a message has finished being processed, you need to call Commit(), so that Sarama releases the go-routine consuming a message and Kafka knows that the message does not need to be delivered again (marks the message, and commits the offset):

// consumer loop
func consume(upstream chan kafka.Message) {
	for {
		msg := <-upstream
		doStuff(msg)
		msg.Commit()
	}
}

You may create a single go-routine to consume messages sequentially, or multiple parallel go-routines (workers) to consume them concurrently:

	// single consume go-routine
	go consume(channels.Upstream)
	// multiple workers to consume messages in parallel
	for w := 1; w <= cfg.KafkaParallelMessages; w++ {
		go consume(channels.Upstream)
	}

You can consume up to as may messages in parallel as partitions are assigned to your consumer, more info in the deep dive section.

Message consumption deep dive

Sarama creates as many go-routines as partitions are assigned to the consumer, for the topic being consumed.

For example, if we have a topic with 60 partitions and we have 2 instances of a service that consumes that topic running at the same time, kafka will assign 30 partitions to each one.

Then Sarama will create 30 parallel go-routines, which this library uses in order to send messages to the upstream channel. Each go-routine waits for the message to finish being processed by waiting for the message-specific upstreamDone channel to be closed, like so:

	channels.Upstream <- msg
	<-msg.upstreamDone

Each Sarama consumption go routine exists only during a particular session. Sessions are periodically destroyed and created by Sarama, according to Kafka events like a cluster re-balance (where the number of partitions assigned to a consumer may change). It is important that messages are released as soon as possible when this happens. The default message consumption timeout is 10 seconds in this scenario (determined by config.Consumer.Group.Session.Timeout).

When a session finishes, we call Consume() again, which tries to establish a new session. If an error occurs trying to establish a new session, it will be retried following an exponential backoff strategy.

Closing

Producers can be closed by calling the Close method.

For graceful handling of Closing consumers, it is advised to use the StopListeningToConsumer method prior to the Close method. This will allow inflight messages to be completed and successfully call commit so that the message does not get replayed once the application restarts.

The Closer channel is used to signal to all the loops that they need to exit because the consumer is being closed.

After successfully closing a producer or consumer, the corresponding Closed channel is closed.

Health-check

The health status of a consumer or producer can be obtained by calling Checker method, which updates the provided CheckState structure with the relevant information:

check, err = cli.Checker(ctx)
  • If a broker cannot be reached, the Status is set to CRITICAL.
  • If all brokers can be reached, but a broker does not provide the expected topic metadata, the Status is set to WARNING.
  • If all brokers can be reached and return the expected topic metadata, we try to initialise the consumer/producer. If it was already initialised, or the initialisation is successful, the Status is set to OK.

Examples

See the examples for some typical usages of this library:

Testing

Some mocks are provided, so that you can test your code interactions with this library. More details here.

onsdigital.dp-kafka's People

Contributors

davidsubiros avatar gedge avatar carlhembrough avatar nshumoogum avatar andre-urbani avatar eldeal avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.