Giter Club home page Giter Club logo

confluent-kafka-go's Introduction

Confluent's Golang Client for Apache KafkaTM

confluent-kafka-go is Confluent's Golang client for Apache Kafka and the Confluent Platform.

Features:

  • High performance - confluent-kafka-go is a lightweight wrapper around librdkafka, a finely tuned C client.

  • Reliability - There are a lot of details to get right when writing an Apache Kafka client. We get them right in one place (librdkafka) and leverage this work across all of our clients (also confluent-kafka-python and confluent-kafka-dotnet).

  • Supported - Commercial support is offered by Confluent.

  • Future proof - Confluent, founded by the creators of Kafka, is building a streaming platform with Apache Kafka at its core. It's high priority for us that client features keep pace with core Apache Kafka and components of the Confluent Platform.

The Golang bindings provides a high-level Producer and Consumer with support for the balanced consumer groups of Apache Kafka 0.9 and above.

See the API documentation for more information.

For a step-by-step guide on using the client see Getting Started with Apache Kafka and Golang.

Examples

High-level balanced consumer

import (
	"fmt"
	"time"

	"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

func main() {

	c, err := kafka.NewConsumer(&kafka.ConfigMap{
		"bootstrap.servers": "localhost",
		"group.id":          "myGroup",
		"auto.offset.reset": "earliest",
	})

	if err != nil {
		panic(err)
	}

	c.SubscribeTopics([]string{"myTopic", "^aRegex.*[Tt]opic"}, nil)

	// A signal handler or similar could be used to set this to false to break the loop.
	run := true

	for run {
		msg, err := c.ReadMessage(time.Second)
		if err == nil {
			fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
		} else if !err.(kafka.Error).IsTimeout() {
			// The client will automatically try to recover from all errors.
			// Timeout is not considered an error because it is raised by
			// ReadMessage in absence of messages.
			fmt.Printf("Consumer error: %v (%v)\n", err, msg)
		}
	}

	c.Close()
}

Producer

import (
	"fmt"
	"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

func main() {

	p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost"})
	if err != nil {
		panic(err)
	}

	defer p.Close()

	// Delivery report handler for produced messages
	go func() {
		for e := range p.Events() {
			switch ev := e.(type) {
			case *kafka.Message:
				if ev.TopicPartition.Error != nil {
					fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
				} else {
					fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
				}
			}
		}
	}()

	// Produce messages to topic (asynchronously)
	topic := "myTopic"
	for _, word := range []string{"Welcome", "to", "the", "Confluent", "Kafka", "Golang", "client"} {
		p.Produce(&kafka.Message{
			TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
			Value:          []byte(word),
		}, nil)
	}

	// Wait for message deliveries before shutting down
	p.Flush(15 * 1000)
}

More elaborate examples are available in the examples directory, including how to configure the Go client for use with Confluent Cloud.

Getting Started

Supports Go 1.17+ and librdkafka 2.4.0+.

Using Go Modules

You can use Go Modules to install confluent-kafka-go.

Import the kafka package from GitHub in your code:

import "github.com/confluentinc/confluent-kafka-go/v2/kafka"

Build your project:

go build ./...

If you are building for Alpine Linux (musl), -tags musl must be specified.

go build -tags musl ./...

A dependency to the latest stable version of confluent-kafka-go should be automatically added to your go.mod file.

Install the client

Manual install:

go get -u github.com/confluentinc/confluent-kafka-go/v2/kafka

Golang import:

import "github.com/confluentinc/confluent-kafka-go/v2/kafka"

librdkafka

Prebuilt librdkafka binaries are included with the Go client and librdkafka does not need to be installed separately on the build or target system. The following platforms are supported by the prebuilt librdkafka binaries:

  • Mac OSX x64 and arm64
  • glibc-based Linux x64 and arm64 (e.g., RedHat, Debian, CentOS, Ubuntu, etc) - without GSSAPI/Kerberos support
  • musl-based Linux amd64 and arm64 (Alpine) - without GSSAPI/Kerberos support
  • Windows amd64 - without GSSAPI/Kerberos support

When building your application for Alpine Linux (musl libc) you must pass -tags musl to go get, go build, etc.

CGO_ENABLED must NOT be set to 0 since the Go client is based on the C library librdkafka.

If GSSAPI/Kerberos authentication support is required you will need to install librdkafka separately, see the Installing librdkafka chapter below, and then build your Go application with -tags dynamic.

Installing librdkafka

If the bundled librdkafka build is not supported on your platform, or you need a librdkafka with GSSAPI/Kerberos support, you must install librdkafka manually on the build and target system using one of the following alternatives:

  • For Debian and Ubuntu based distros, install librdkafka-dev from the standard repositories or using Confluent's Deb repository.
  • For Redhat based distros, install librdkafka-devel using Confluent's YUM repository.
  • For MacOS X, install librdkafka from Homebrew. You may also need to brew install pkg-config if you don't already have it: brew install librdkafka pkg-config.
  • For Alpine: apk add librdkafka-dev pkgconf
  • For Windows: there are no official/supported packages, but static builds are included for Windows/x64. Installing from source is needed only for GSSAPI/Kerberos support.
  • For source builds, see instructions below.

Build from source:

git clone https://github.com/confluentinc/librdkafka.git
cd librdkafka
./configure
make
sudo make install

After installing librdkafka you will need to build your Go application with -tags dynamic.

Note: If you use the master branch of the Go client, then you need to use the master branch of librdkafka.

confluent-kafka-go requires librdkafka v1.9.0 or later.

Static builds on Linux

Since we are using cgo, Go builds a dynamically linked library even when using the prebuilt, statically-compiled librdkafka as described in the librdkafka chapter.

For glibc based systems, if the system where the client is being compiled is different from the target system, especially when the target system is older, there is a glibc version error when trying to run the compiled client.

Unfortunately, if we try building a statically linked binary, it doesn't solve the problem, since there is no way to have truly static builds using glibc. This is because there are some functions in glibc, like getaddrinfo which need the shared version of the library even when the code is compiled statically.

One way around this is to either use a container/VM to build the binary, or install an older version of glibc on the system where the client is being compiled.

The other way is using musl to create truly static builds for Linux. To do this, install it for your system.

Static compilation command, meant to be used alongside the prebuilt librdkafka bundle:

CC=/path/to/musl-gcc go build --ldflags '-linkmode external -extldflags "-static"' -tags musl

API Strands

The recommended API strand is the Function-Based one, the Channel-Based one is documented in examples/legacy.

Function-Based Consumer

Messages, errors and events are polled through the consumer.Poll() function.

It has direct mapping to underlying librdkafka functionality.

See examples/consumer_example

Function-Based Producer

Application calls producer.Produce() to produce messages. Delivery reports are emitted on the producer.Events() or specified private channel.

Warnings

  • Produce() is a non-blocking call, if the internal librdkafka queue is full the call will fail and can be retried.

See examples/producer_example

License

Apache License v2.0

KAFKA is a registered trademark of The Apache Software Foundation and has been licensed for use by confluent-kafka-go. confluent-kafka-go has no affiliation with and is not endorsed by The Apache Software Foundation.

Developer Notes

See kafka/README

Contributions to the code, examples, documentation, et.al, are very much appreciated.

Make your changes, run gofmt, tests, etc, push your branch, create a PR, and sign the CLA.

Confluent Cloud

For a step-by-step guide on using the Golang client with Confluent Cloud see Getting Started with Apache Kafka and Golang on Confluent Developer.

confluent-kafka-go's People

Contributors

17twenty avatar billygout avatar confluentjenkins avatar edenhill avatar emasab avatar eran-levy avatar finncolman avatar fzmoment avatar hqin avatar ideasculptor avatar jainruchir avatar jeffwidman avatar jliunyu avatar kevinconaway avatar khorshuheng avatar kimmachinegun avatar kjtsanaktsidis avatar kkoehler avatar mahajanadhitya avatar mhowlett avatar milindl avatar okulik avatar pavelnikolov avatar pranavrth avatar prasanthv454 avatar rayokota avatar rigelbm avatar ritwik12 avatar rondagostino avatar snosratiershad avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

confluent-kafka-go's Issues

Segfault on the Consumer when partitions are added to the cluster

Description

A) With the Consumer (confluent-kafka-go and librdkafka from HEAD), there is a Segfault, when partitions are added to a topic.
B) Besides Segfaulting, the consumers do not pick up the new topics, on there own (without changing the consumer group).

Is B) expected behavior? Can we fix A)?

Thank's for your work on the Kafka libraries.

How to reproduce

On the broker, create a topic with 2 partitions:
$KAFKA_HOME/bin/kafka-topics.sh --zookeeper localhost:2181 --topic topic --create --replication-factor 1 --partitions 2

Run 2 Consumers with:
confluent-kafka-go/examples/consumer_channel_example/consumer_channel_example.go

On the broker, change the topic configuration with:
$KAFKA_HOME/bin/kafka-topics.sh --zookeeper localhost:2181 --topic topic --alter --partitions 4

Observe 1)
No Consumer picked up the new partitions

Stop 1 Consumer with <CTRL+C>

Observe 2)
The other Consumer crashes with a Segfault.
Before the Segfault it logs "PARTCNT|[thrd:main]: Topic topic partition count changed from 2 to 4"

Checklist

Please provide the following information:

  • confluent-kafka-go version: f696f02
  • librdkafka version: a85c798
  • Apache Kafka version: kafka_2.12-0.11.0.0
  • Operating system: CentosOS
  • librdkafka client configuration: see confluent-kafka-go/examples/consumer_channel_example/consumer_channel_example.go

'testing' package is imported in non-test file

Given the following program:

package main

import (
	"flag"

	"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
	flag.Parse()
	kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "foo"})
}

we get the following help output:

$ ./foo --help
Usage of ./foo:
  -test.bench regexp
        run only benchmarks matching regexp
  -test.benchmem
        print memory allocations for benchmarks
  -test.benchtime d
        run each benchmark for duration d (default 1s)
  -test.blockprofile file
        write a goroutine blocking profile to file
  -test.blockprofilerate rate
        set blocking profile rate (see runtime.SetBlockProfileRate) (default 1)
  -test.count n
        run tests and benchmarks n times (default 1)
  -test.coverprofile file
        write a coverage profile to file
  -test.cpu list
        comma-separated list of cpu counts to run each test with
  -test.cpuprofile file
        write a cpu profile to file
  -test.memprofile file
        write a memory profile to file
  -test.memprofilerate rate
        set memory profiling rate (see runtime.MemProfileRate)
  -test.mutexprofile string
        write a mutex contention profile to the named file after execution
  -test.mutexprofilefraction int
        if >= 0, calls runtime.SetMutexProfileFraction() (default 1)
  -test.outputdir dir
        write profiles to dir
  -test.parallel n
        run at most n tests in parallel (default 4)
  -test.run regexp
        run only tests and examples matching regexp
  -test.short
        run smaller test suite to save time
  -test.timeout d
        fail test binary execution after duration d (0 means unlimited)
  -test.trace file
        write an execution trace to file
  -test.v
        verbose: print additional output

This is because the testing package is imported by testhelpers.go, so any program that imports the driver also imports the testing package effectively.

The solution would be for the testing helpers to be moved to a separate package which is then imported in the _test.go files.

confluent-kafka-go v0.9.4

Can I intercept logs?

Log messages from librdkafka seem to go directly to stdout. Is it possible to intercept these? Seems like librdkafka provides log_cb for this purpose.

Build fails on raspberry pi

I have raspberry pi with raspbian, installed go version 1.8.3 and librdkafka installed according to instructions (latest version from repository). After downloading this project I have tried to compile the producer channel example, but go complained about kafka/message.go file -- there are two errors on line 213.
kafka/message.go:213 constant 2147483648 overflows int
kafka/message.go:213 array bound is too large
Is it possible to be a bug?

consumer: Async offset commits

Hi @edenhill.

Async commits are supported by librdkafka and the Python driver built on top of it, but not by confluent-kafka-go.

This would be useful in our case: we manually commit the offsets, but don't want to do this synchronously since this would cause performance issues with a big number of very fast jobs. So we have to implement our own batching logic to step around this issue. It would be nice if this was implemented right in the driver instead.

Are there any plans to support such a use case?

Thanks!

Failed to resolve

I have a problem when I try to connect with broker kafka with localhost on port 9092 :

Failed to resolve '9c95dafd9323:9092': Name or service not known

I not understand why localhost is translated by 9c95dafd9323, it not work but it's normal,

Can you help me please ?

Thank in advance

Fail fast on librdkafka connection error

When a Kafka connection fails in librdkafka, this information should propagate up to confluent-kafka-go, so that applications can decide to fail fast, reconnect, etc.

Right now, librdkafka reports connection errors to the console, but the confluent-kafka-go application continues as if everything is just fine.

auto_offset_reset doesn't change much

I am trying to consume latest messages using:

kafka.ConfigMap{"auto.offset.reset": "latest"}

It seems that it doesn't' affect anything and still reads from the last available offset.

What I need it that it will always start consuming NEW message and ignore the previous offset.

Expose the librdkafka stats callback in the go client?

I was reading through the librdkafka docs and noticed there's a stats_cb mentioned in those docs but grep'ing the golang client codebase doesn't yield any hits on this. Is it planned to expose that to the go client?

thanks

Work around librdkafka DNS limitation

Could confluent-kafka-go guard against mDNS failures in the librdkafka code path, which result in .local-style broker addresses failing to connect in some environments?

One way to do this is to resolve the configured bootstrap server addresses, using the pure Go resolver, before handing this configuration to librdkafka.

go build -tags static

Hi,

I'm having trouble building a binary with librdkafka as a static library.

$ go build -tags static -ldflags -v bin/blu.go
a# command-line-arguments
HEADER = -H4 -T0x401000 -D0x0 -R0x1000
searching for runtime.a in $WORK/runtime.a
searching for runtime.a in /go/pkg/linux_amd64/runtime.a
searching for runtime.a in /usr/local/go/pkg/linux_amd64/runtime.a
 0.00 deadcode
 0.03 pclntab=1408982 bytes, funcdata total 187752 bytes
 0.04 dodata
 0.05 dwarf
 0.08 symsize = 0
 0.12 reloc
 0.13 asmb
 0.13 codeblk
 0.14 datblk
 0.15 sym
 0.15 symsize = 256320
 0.16 symsize = 258288
 0.16 dwarf
 0.18 headr
 0.19 host link: "gcc" "-m64" "-gdwarf-2" "-o" "/tmp/go-build416718997/command-line-arguments/_obj/exe/a.out" "-rdynamic" "/tmp/go-link-567465902/go.o" "/tmp/go-link-567465902/000000.o" "/tmp/go-link-567465902/000001.o" "/tmp/go-link-567465902/000002.o" "-g" "-O2" "-g" "-O2" "-Wl,-Bstatic" "-lrdkafka" "-Wl,-Bdynamic" "-lrdkafka" "-lpthread" "-lz" "-lcrypto" "-lssl" "-lsasl2" "-lrt" "-g" "-O2" "-lpthread"
 0.29 cpu time
94449 symbols
84360 liveness data
$ ldd blu
	linux-vdso.so.1 (0x00007ffda22d0000)
	librdkafka.so.1 => /usr/lib/x86_64-linux-gnu/librdkafka.so.1 (0x00007f3b0fc16000)
	libpthread.so.0 => /lib/x86_64-linux-gnu/libpthread.so.0 (0x00007f3b0f9f9000)
	libz.so.1 => /lib/x86_64-linux-gnu/libz.so.1 (0x00007f3b0f7dd000)
	libcrypto.so.1.0.0 => /usr/lib/x86_64-linux-gnu/libcrypto.so.1.0.0 (0x00007f3b0f3e1000)
	libssl.so.1.0.0 => /usr/lib/x86_64-linux-gnu/libssl.so.1.0.0 (0x00007f3b0f180000)
	libsasl2.so.2 => /usr/lib/x86_64-linux-gnu/libsasl2.so.2 (0x00007f3b0ef63000)
	librt.so.1 => /lib/x86_64-linux-gnu/librt.so.1 (0x00007f3b0ed5b000)
	libc.so.6 => /lib/x86_64-linux-gnu/libc.so.6 (0x00007f3b0e9b0000)
	/lib64/ld-linux-x86-64.so.2 (0x0000562bb6602000)
	libdl.so.2 => /lib/x86_64-linux-gnu/libdl.so.2 (0x00007f3b0e7ab000)
	libresolv.so.2 => /lib/x86_64-linux-gnu/libresolv.so.2 (0x00007f3b0e594000)

As you can see librdkafka appears twice when linking.

I tried playing with pkg-config but could not get the correct LDFLAGS without -lrdfafka.
The only fix I could think of is manually setting LDFLAGS in kafka/build_static.go:

// +build static
// +build !static_all

package kafka

// #cgo LDFLAGS: -Wl,-Bstatic -lrdkafka -Wl,-Bdynamic -lpthread -lz -lcrypto -lssl -lsasl2 -lrt
import "C"

As I've never built with cgo before I might be doing something wrong, is this expected ?

Slow Producer and huge memory leak

I am sending messages to Kafka using this code:

	deliveryChan := make(chan kafka.Event)
	topic:=viper.GetString("kafka.forwardtopic")
	kf.Producer.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(bulk)}, deliveryChan)
	e := <-deliveryChan
	m := e.(*kafka.Message)

	if m.TopicPartition.Error != nil {
		logger.Errorf("Delivery failed: %v\n", m.TopicPartition.Error)
	}
	close(deliveryChan)

However this is extremely slow. Sometimes it takes a second or even 2. I guess it hangs on:

e := <-deliveryChan
Because it is waiting for Kafka acknowledge.

So I tried the same without the channel because I don't really need Kafka acknowledge:

	//deliveryChan := make(chan kafka.Event)
	topic:=viper.GetString("kafka.forwardtopic")
	kf.Producer.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(bulk)}, nil)
	//e := <-deliveryChan
	//m := e.(*kafka.Message)

	//if m.TopicPartition.Error != nil {
	//	logger.Errorf("Delivery failed: %v\n", m.TopicPartition.Error)
	//}
	//close(deliveryChan)

But this creates a huge memory leak and my app crashes after few minutes:

Problem with librdkafka

Hi,

I've been trying to use conluent-kafka-go and after installing librdkafka I get the following error:

λ go install
# github.com/confluentinc/confluent-kafka-go/kafka
could not determine kind of name for C.rd_kafka_conf_set_default_topic_conf

I've googled it and found nothing. Can someone please explains what is happening?

Btw I'm using a mac with OsX Yosemite.

Setting offsets

I am writing a tool similar to kafkacat due to certain deserialization requirements. I would like to be able to specify an offset relative to the tail of a topic. However when using kafka.OffsetTail, I get an invalid offset when trying to make a new consumer.

I looked at the source for kafkacat and saw that it uses the same function in C as in cgo that I thought would work for Go. I also tried the go-kafkacat example with the -o option and had the same result.

Am I missing something obvious?

consumer: Close() blocks indefinitely

Hello.

I'm using confluent-kafka-go 0.9.4 with librdkafka 0.9.5 on MacOS. I create two consumers consuming from the same topic and all is fine until I call Close() on them, which blocks forever on: https://github.com/confluentinc/confluent-kafka-go/blob/v0.9.4/kafka/consumer.go#L255

Debug logs:

# ...
%7|1495808566.326|CGRPTERM|rdkafka#consumer-2| [thrd:main]: Terminating group "127.0.0.1" in state up with 1 partition(s)
%7|1495808566.326|CGRPTERM|rdkafka#consumer-1| [thrd:main]: Terminating group "127.0.0.1" in state up with 1 partition(s)
%7|1495808566.326|UNSUBSCRIBE|rdkafka#consumer-2| [thrd:main]: Group "127.0.0.1": unsubscribe from current subscription of 1 topics (leave group=yes, join sta
te wait-revoke-rebalance_cb, v4)
%7|1495808566.326|UNSUBSCRIBE|rdkafka#consumer-1| [thrd:main]: Group "127.0.0.1": unsubscribe from current subscription of 1 topics (leave group=yes, join sta
te wait-revoke-rebalance_cb, v4)
%7|1495808566.326|SUBSCRIPTION|rdkafka#consumer-2| [thrd:main]: Group "127.0.0.1": clearing subscribed topics list (1)
%7|1495808566.326|SUBSCRIPTION|rdkafka#consumer-1| [thrd:main]: Group "127.0.0.1": clearing subscribed topics list (1)
%7|1495808566.326|SUBSCRIPTION|rdkafka#consumer-2| [thrd:main]: Group "127.0.0.1": effective subscription list changed from 1 to 0 topic(s):
%7|1495808566.326|SUBSCRIPTION|rdkafka#consumer-1| [thrd:main]: Group "127.0.0.1": effective subscription list changed from 1 to 0 topic(s):
%7|1495808566.326|CGRPTERM|rdkafka#consumer-2| [thrd:main]: Group "127.0.0.1": waiting for rebalance_cb, 1 toppar(s), 0 unassignment(s), 0 commit(s), wait-una
ssign flag, (state up, join-state wait-revoke-rebalance_cb) before terminating
%7|1495808566.326|CGRPTERM|rdkafka#consumer-1| [thrd:main]: Group "127.0.0.1": waiting for rebalance_cb, 1 toppar(s), 0 unassignment(s), 0 commit(s), wait-una
ssign flag, (state up, join-state wait-revoke-rebalance_cb) before terminating
%7|1495808566.326|CGRPTERM|rdkafka#consumer-2| [thrd:main]: Group "127.0.0.1": waiting for rebalance_cb, 1 toppar(s), 0 unassignment(s), 0 commit(s), wait-una
ssign flag, (state up, join-state wait-revoke-rebalance_cb) before terminating
%7|1495808566.326|CGRPTERM|rdkafka#consumer-1| [thrd:main]: Group "127.0.0.1": waiting for rebalance_cb, 1 toppar(s), 0 unassignment(s), 0 commit(s), wait-una
ssign flag, (state up, join-state wait-revoke-rebalance_cb) before terminating
%7|1495808566.420|CGRPTERM|rdkafka#consumer-1| [thrd:main]: Group "127.0.0.1": waiting for rebalance_cb, 1 toppar(s), 0 unassignment(s), 0 commit(s), wait-una
ssign flag, (state up, join-state wait-revoke-rebalance_cb) before terminating
%7|1495808566.610|CGRPTERM|rdkafka#consumer-2| [thrd:main]: Group "127.0.0.1": waiting for rebalance_cb, 1 toppar(s), 0 unassignment(s), 0 commit(s), wait-una
ssign flag, (state up, join-state wait-revoke-rebalance_cb) before terminating
%7|1495808567.424|CGRPTERM|rdkafka#consumer-1| [thrd:main]: Group "127.0.0.1": waiting for rebalance_cb, 1 toppar(s), 0 unassignment(s), 0 commit(s), wait-una
ssign flag, (state up, join-state wait-revoke-rebalance_cb) before terminating
%7|1495808567.613|CGRPTERM|rdkafka#consumer-2| [thrd:main]: Group "127.0.0.1": waiting for rebalance_cb, 1 toppar(s), 0 unassignment(s), 0 commit(s), wait-una
ssign flag, (state up, join-state wait-revoke-rebalance_cb) before terminating
%7|1495808568.425|CGRPTERM|rdkafka#consumer-1| [thrd:main]: Group "127.0.0.1": waiting for rebalance_cb, 1 toppar(s), 0 unassignment(s), 0 commit(s), wait-una
ssign flag, (state up, join-state wait-revoke-rebalance_cb) before terminating
# ... (goes on forever)

librdkafka configuration:

{
 "api.version.request": true,
 "bootstrap.servers": "foo-bar.com",
 "log.connection.close": false,
 "session.timeout.ms": 6000,
 "go.events.channel.enable": true,
 "go.application.rebalance.enable": true,
 "enable.auto.commit": false,
 "debug": "cgrp,topic,protocol"
}

Setting go.events.channel.enable to false makes Close() proceed. However I'm calling Assign() and Unassign() properly in the consumer loop (as demonstrated in the examples).

This also doesn't happen with a single consumer. It seems that it's the rebalancing that causes this.

Any ideas?

Thanks in advance.

README: Link to CLAHub

Please have the README link to the CLAHub license, so that contributors can easily find and sign the license.

topic config ignored

Neither auto.commit.enable or auto.commit.interval.ms settings seem to have any sort of effect, which makes me think that maybe the whole default.topic.config is ignored.

package main

import (
	"fmt"
	"os"

	"github.com/confluentinc/confluent-kafka-go/kafka"
)

type KafkaSubscriptionEvent interface {
	// String returns a human-readable representation of the event
	String() string
}

type KafkaSubscription struct {
	Events    chan KafkaSubscriptionEvent
	Consumer  *kafka.Consumer
	IsRunning bool
}

func main() {
	subscription, err := NewKafkaSubscription("g1", "localhost:9092", []string{"t1"})

	if err != nil {
		panic(err)
	}

	defer subscription.Close()
	subscription.Start()
}

func NewKafkaSubscription(group string, brokers string, topics []string) (*KafkaSubscription, error) {
	hostname, err := os.Hostname()

	if err != nil {
		return nil, err
	}

	clientId := fmt.Sprint(hostname+"."+group+".", os.Getpid())

	consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
		"bootstrap.servers":               brokers,
		"client.id":                       clientId,
		"group.id":                        group,
		"session.timeout.ms":              6000,
		"go.events.channel.enable":        true,
		"go.application.rebalance.enable": true,
		"default.topic.config": kafka.ConfigMap{
			"auto.offset.reset":       "smallest",
			"auto.commit.enable":      false,
			"auto.commit.interval.ms": 86400000,
		}})

	if err != nil {
		return nil, err
	}

	err = consumer.SubscribeTopics(topics, nil)

	if err != nil {
		return nil, err
	}

	return &KafkaSubscription{
		Events:    make(chan KafkaSubscriptionEvent),
		Consumer:  consumer,
		IsRunning: false,
	}, nil
}

func (subscription *KafkaSubscription) Start() {
	subscription.IsRunning = true
	for subscription.IsRunning {
		select {
		case ev := <-subscription.Consumer.Events():
			switch e := ev.(type) {
			case kafka.AssignedPartitions:
				subscription.Events <- e
				subscription.Consumer.Assign(e.Partitions)
			case kafka.RevokedPartitions:
				subscription.Consumer.Unassign()
				subscription.Events <- e
			case *kafka.Message:
				subscription.Events <- e
			case kafka.PartitionEOF:
				subscription.Events <- e
			case kafka.Error:
				subscription.Events <- e
				subscription.IsRunning = false
			case kafka.OffsetsCommitted:
				fmt.Println(e)
			}
		}
	}
}

func (subscription *KafkaSubscription) Close() {
	close(subscription.Events)
	subscription.Consumer.Close()
}

After some seconds the following message appears in the console:

OffsetsCommitted (<nil>, [t1[0]@unset t1[1]@unset t1[2]@unset t1[3]@4 t1[4]@unset t1[5]@unset t1[6]@4 t1[7]@unset t1[8]@unset t1[9]@4])

Channel consumer and batch producers ignore configured batch sizes internally

When using a channel-based consumer, the size of the event channel is configurable. However, the internal consumeReader function passes the maxEvents parameter as a hard-coded 1000 (the default config).

Same goes for batch producers: the batch/channel size can be specified through the ConfigMap parameter, but the channelBatchProducer relies on a const batchSize int = 100000 (again, the default batch size).

I've opened a PR to have the maxEvents and batchSize depend on the config and/or the actual space available in the channel:

#17

Could not determine kind of name for C.RD_KAFKA_EVENT_STATS

Pulled latest changes. when i am trying run go code getting this error
could not determine kind of name for C.RD_KAFKA_EVENT_STATS

Here my machine configuration

GOARCH="amd64"
GOBIN=""
GOEXE=""
GOHOSTARCH="amd64"
GOHOSTOS="darwin"
GOOS="darwin"
GOPATH="/Users/rb/workspace"
GORACE=""
GOROOT="/usr/local/Cellar/go/1.8.1/libexec"
GOTOOLDIR="/usr/local/Cellar/go/1.8.1/libexec/pkg/tool/darwin_amd64"
GCCGO="gccgo"
CC="clang"
GOGCCFLAGS="-fPIC -m64 -pthread -fno-caret-diagnostics -Qunused-arguments -fmessage-length=0 -fdebug-prefix-map=/var/folders/jm/rf97g_5d6p91cpp6s_jgzn140000gn/T/go-build788309128=/tmp/go-build -gno-record-gcc-switches -fno-common"
CXX="clang++"
CGO_ENABLED="1"
PKG_CONFIG="pkg-config"
CGO_CFLAGS="-g -O2"
CGO_CPPFLAGS=""
CGO_CXXFLAGS="-g -O2"
CGO_FFLAGS="-g -O2"
CGO_LDFLAGS="-g -O2"

How to build against Linux targets?

When I use GOOS=linux go build against confluent-kafka-go, Go complains "no buildable Go source files".

Which is too bad, because I would like to be able to create a Linux, AMD64 binary of a Kafka app for use in Docker.

OSX: unable to build kafkacat

Followed install instructions, ("brew install pkg-config git librdkafka") and got:

$ go get gopkg.in/confluentinc/confluent-kafka-go.v0/kafka
$ go get gopkg.in/confluentinc/confluent-kafka-go.v0/examples/go-kafkacat
# github.com/confluentinc/confluent-kafka-go/kafka
github.com/confluentinc/confluent-kafka-go/kafka/producer.go:46:9: error: use of undeclared identifier 'RD_KAFKA_V_END'
        RD_KAFKA_V_END);
        ^
1 error generated.
$

Build system is El-Capitan Mac, running "go version go1.7.5 darwin/amd64"

Was going to wonder if there was a header file that it's not picked up, but if that's the cause then it's a bit weird that it's only one id that's failing. Any suggestions?

Consumer.SubscribeTopics for more than one topic is not working

Hello, I'm trying to subscribe to 2 topics at once calling Consumer.SubscribeTopics using this code:

	err := q.Consumer.SubscribeTopics(q.Topics, nil)
	if err != nil {
		l.WithError(err).Error("error subscribing to topics")
		return err
	}

	l.Info("successfully subscribed to topics")

	for q.run == true {
		select {
		case ev := <-q.Consumer.Events():
			switch e := ev.(type) {
			case kafka.AssignedPartitions:
				err = q.assignPartitions(e.Partitions)
				if err != nil {
					l.WithError(err).Error("error assigning partitions")
				}
			case kafka.RevokedPartitions:
				err = q.unassignPartitions()
				if err != nil {
					l.WithError(err).Error("error revoking partitions")
				}
			case *kafka.Message:
				q.receiveMessage(e.TopicPartition, e.Value)
			case kafka.PartitionEOF:
				q.handlePartitionEOF(ev)
			case kafka.OffsetsCommitted:
				q.handleOffsetsCommitted(ev)
			case kafka.Error:
				q.handleError(ev)
				q.StopConsuming()
				return e
			default:
				q.handleUnrecognized(e)
			}
		}
	}

q.Topics contains two topics that are successfully printed in the log.

But I'm only receiving kafka.AssignedPartitions events of one of the topics, the last in the list, and I'm also only receiving messages sent to it... any hints? am I doing something wrong?

I'm configuring kafka as follows:

kafka.NewConsumer(&kafka.ConfigMap{
			"bootstrap.servers":               q.Brokers,
			"group.id":                        q.ConsumerGroup,
			"session.timeout.ms":              q.SessionTimeout,
			"go.events.channel.enable":        true,
			"go.application.rebalance.enable": true,
			"enable.auto.commit":              true,
			"default.topic.config": kafka.ConfigMap{
				"auto.offset.reset":  q.OffsetResetStrategy,
				"auto.commit.enable": true,
			},
		})

kafka version is 0.10.1.1

Producer not giving any error when the kafka brokers are down

The producer and consumer work well when the kafka broker is up, when the kafka broker goes down, the producer does not give any error.

err := cp.producer.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topicName, Partition: kafka.PartitionAny}, Value: []byte(message)}, cp.deliveryChannel)

if err != nil {
	fmt.Println("Print Error")
	fmt.Println(err)
}
e := <-cp.deliveryChannel
m := e.(*kafka.Message)

In the above code the producer gets stuck at the channel (second last line) where broker is down and it remains stuck at that line even after kafka broker is back online. Even the produce function call does not return any error. Is there no way of us handling cases where kafka goes offline for some time and comes back online. I get the below log statements on my console.
%3|1484314998.395|FAIL|rdkafka#producer-1| localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused
%3|1484314998.396|ERROR|rdkafka#producer-1| localhost:9092/bootstrap: Connect to ipv4#127.0.0.1:9092 failed: Connection refused
%3|1484314998.396|ERROR|rdkafka#producer-1| 1/1 brokers are down

json.Marshal results in "panic: runtime error: cgo argument has Go pointer to Go pointer"

Hi there!

Changing the producer_example.go to marshal a struct (see below) results in this error:

» go run producer_example.go localhost:9092 foo
Created Producer rdkafka#producer-1
panic: runtime error: cgo argument has Go pointer to Go pointer

goroutine 1 [running]:
panic(0x40d59a0, 0xc420ff0090)
	/usr/local/Cellar/go/1.7.4_1/libexec/src/runtime/panic.go:500 +0x1a1
github.com/confluentinc/confluent-kafka-go/kafka._cgoCheckPointer0(0xc42009a024, 0x0, 0x0, 0x0, 0x0)
	??:0 +0x59
github.com/confluentinc/confluent-kafka-go/kafka.(*Producer).produce(0xc4200a0000, 0xc42004deb0, 0x0, 0xc420098120, 0x0, 0xc42009a000)
	/Users/matthias/code/src/github.com/confluentinc/confluent-kafka-go/kafka/producer.go:130 +0x31b
github.com/confluentinc/confluent-kafka-go/kafka.(*Producer).Produce(0xc4200a0000, 0xc42004deb0, 0xc420098120, 0x11, 0x40)
	/Users/matthias/code/src/github.com/confluentinc/confluent-kafka-go/kafka/producer.go:154 +0x48
main.main()
	/Users/matthias/code/src/github.com/confluentinc/confluent-kafka-go/examples/producer_example/producer_example.go:53 +0x454
exit status 2

Here are the changes to producer_example.go:

diff --git a/examples/producer_example/producer_example.go b/examples/producer_example/producer_example.go
index f81f98a..1bf4ab4 100644
--- a/examples/producer_example/producer_example.go
+++ b/examples/producer_example/producer_example.go
@@ -18,9 +18,11 @@ package main
  */

 import (
+       "encoding/json"
        "fmt"
-       "github.com/confluentinc/confluent-kafka-go/kafka"
        "os"
+
+       "github.com/confluentinc/confluent-kafka-go/kafka"
 )

 func main() {
@@ -47,7 +49,7 @@ func main() {
        // .Events channel is used.
        deliveryChan := make(chan kafka.Event)

-       value := "Hello Go!"
+       value, _ := json.Marshal(struct{ M string }{M: "Hello Go!"})
        err = p.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny}, Value: []byte(value)}, deliveryChan)

        e := <-deliveryChan

Should this work?

Schema registry support

I am not able to pass url for schema registry url in the config map. It would be great if somebody point me to right place. I looked at all the documentation. We are using confluent kakfa and schema registry all over the place. In java & python we are able to pass. But not able to use it go.

ConsumerGroup is not consuming after kafka broker restart

Hi, I'm testing if my app that uses confluent-kafka-go is resilient to kafka broker restarts.
But when the broker comes back up, the app is not consuming anything.

Kafka logs this when app starts

[2017-01-09 15:11:17,469] INFO [GroupCoordinator 1002]: Preparing to restabilize group mytopic-consumer-group with old generation 6 (kafka.coordinator.GroupCoordinator)
[2017-01-09 15:11:22,299] INFO [GroupCoordinator 1002]: Stabilized group mytopic-consumer-group generation 7 (kafka.coordinator.GroupCoordinator)
[2017-01-09 15:11:22,506] INFO [GroupCoordinator 1002]: Assignment received from leader for group mytopic-consumer-group for generation 7 (kafka.coordinator.GroupCoordinator)

But when the broker restarts the GroupCoordinator logs nothing.

Should it be asking for AssignPartitions again when the connection is lost/reestablished?

Bulk data requests to multiple channels

Is there any example how to process data from multiple channels but not by subscribing to all of the channels at once? I would prefer to look if there are any data in a channel and if there are any load them, make some computations, discard them and continue with next channel. Is that possible?
In the system is potentially a lot of data and I need group them by mac address and timestamp during the computations so the kafka is used in a similar fashion to first step of bucket sort. The data are divided to topics according to first three letters of mac address.

May I do something like adding c.UnsubscribeTopics(topics, ...) to case kafka.PartitionEOF in consumer channel example?

Always getting -1001 for topicPartition offset value

I'm trying to work on offset commit management with this library. My hope is when I'm using the channel based consumer and I get a kafka.AssignedPartitions message (using the latest auto reset strategy) that the offset I get back for each topic/partition contains the starting offset that I was assigned to. What I'm seeing is that all partitions are returning -1001 as the offset value no matter what I do.

When I look at the output of burrow, it shows me that the consumer group is sending its commits in what appears to be a correct manner.

Any idea what might be going on?
thanks!

LOG MESSAGE THAT PRINTS OUT

topic=testtopic part=0 offset=-1001 err=<nil> low=11502464 high=12057147
topic=testtopic part=1 offset=-1001 err=<nil> low=11535834 high=15125592
topic=testtopic part=2 offset=-1001 err=<nil> low=11450986 high=14338118
topic=testtopic part=3 offset=-1001 err=<nil> low=11315825 high=13661644

BURROW OUTPUT


  "error": false,
  "message": "consumer group status returned",
  "status": {
    "cluster": "testcluster",
    "group": "mytestclient",
    "status": "OK",
    "complete": false,
    "partitions": [
      {
        "topic": "testtopic",
        "partition": 0,
        "status": "OK",
        "start": {
          "offset": 13625698,
          "timestamp": 1485219538694,
          "lag": 0
        },
        "end": {
          "offset": 13639130,
          "timestamp": 1485220774613,
          "lag": 1662449
        }
      },
// Example channel-based high-level Apache Kafka consumer
package main


import (
	"fmt"
	"github.com/confluentinc/confluent-kafka-go/kafka"
	"os"
	"os/signal"
	"syscall"
)

func main() {

	broker := "127.0.0.1"
	group := "mytestclient"
	topics := []string{"testtopic"}

	sigchan := make(chan os.Signal)
	signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)


	c, err := kafka.NewConsumer(&kafka.ConfigMap{
		"bootstrap.servers":               broker,
		"group.id":                        group,
		"session.timeout.ms":              6000,
		"go.events.channel.enable":        true,
		"go.application.rebalance.enable": true,
		"default.topic.config":            kafka.ConfigMap{"auto.offset.reset": "latest"}})

	if err != nil {
		fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
		os.Exit(1)
	}

	fmt.Printf("Created Consumer %v\n", c)

	err = c.SubscribeTopics(topics, nil)

	run := true

	for run == true {
		select {
		case sig := <-sigchan:
			fmt.Printf("Caught signal %v: terminating\n", sig)
			run = false

		case ev := <-c.Events():
			switch e := ev.(type) {
			case kafka.AssignedPartitions:
				for _, tp := range e.Partitions {
					low, high, _ := c.QueryWatermarkOffsets("testtopic", tp.Partition, 3000)
					fmt.Printf("topic=%s part=%d offset=%d err=%v low=%d high=%d\n", *tp.Topic, tp.Partition, tp.Offset, tp.Error, low, high)

				}
				c.Assign(e.Partitions)
			case kafka.RevokedPartitions:
				fmt.Fprintf(os.Stderr, "%% %v\n", e)
				c.Unassign()
			case *kafka.Message:
				fmt.Printf("%% Message on %s:\n%s\n",
			case kafka.PartitionEOF:
				fmt.Printf("%% Reached %v\n", e)
			case kafka.Error:
				fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)
				run = false
			}
		}
	}

	fmt.Printf("Closing consumer\n")
	c.Close()
}

Add support for the topic admin APIs

I'd like to start work on a Terraform provider for Kafka (unless you're already planning on doing that yourselves) that can give me a way to declaratively define topics and their configurations, but I'd need to use the new Kafka 0.11 APIs for creating, modifying and deleting topics. Is this something you plan to support in the Go library?

Logger integration

I ran a test to see what happened when my application encountered a broker that was shut down. Instead of getting back an errors from kafka.NewProducer or from setting a (*kafka.Message).TopicPartition.Error on message submission, what I was seeing was that the library appears to be writing directly to stderr. An example:

%3|1490436188.086|ERROR|kpush#producer-1| [thrd:localhost:9092/bootstrap]: localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused
%3|1490436188.086|ERROR|kpush#producer-1| [thrd:localhost:9092/bootstrap]: 1/1 brokers are down

Is there some way to control this behavior? I didn't see any options for controlling the logging output in

https://github.com/confluentinc/confluent-kafka-go
http://docs.confluent.io/3.2.0/clients/confluent-kafka-go/index.html

could not determine kind of name for C.RD_KAFKA_EVENT_STATS

librdkafka

librdkafka: stable 0.9.5 (bottled), HEAD
The Apache Kafka C/C++ library
https://github.com/edenhill/librdkafka
/usr/local/Cellar/librdkafka/0.9.5 (14 files, 1.6MB) *
  Poured from bottle on 2017-05-22 at 14:00:51
From: https://github.com/Homebrew/homebrew-core/blob/master/Formula/librdkafka.rb
==> Dependencies
Build: pkg-config ✔
Required: lzlib ✔, openssl ✔
Recommended: lz4 ✔
==> Options
--without-lz4
	Build without lz4 support
--HEAD
	Install HEAD version

go env

GOARCH="amd64"
GOBIN=""
GOEXE=""
GOHOSTARCH="amd64"
GOHOSTOS="darwin"
GOOS="darwin"
GOPATH="/Users/nut-abctech/go"
GORACE=""
GOROOT="/usr/local/Cellar/go/1.7.4_1/libexec"
GOTOOLDIR="/usr/local/Cellar/go/1.7.4_1/libexec/pkg/tool/darwin_amd64"
CC="clang"
GOGCCFLAGS="-fPIC -m64 -pthread -fno-caret-diagnostics -Qunused-arguments -fmessage-length=0 -fdebug-prefix-map=/var/folders/hh/w1s3l2550w33x60tfgnjhwnh0000gn/T/go-build059603000=/tmp/go-build -gno-record-gcc-switches -fno-common"
CXX="clang++"
CGO_ENABLED="1"

Error occur when try to go get -u github.com/confluentinc/confluent-kafka-go/kafka

Golang 1.8 support

Test that it works, get it running on travis, do a performance comparison

Detecting that the brokers are down in Producer

I'm trying a case where the brokers are not up and then I send a message through the producer, I expected to receive some kind of error in Producer.Events channel, however I received none.

Are there any way to detect that the brokers are not up so that I can alert that no messages are really being sent?

EDIT:
Actually after a lot of time I got an error:
Message timed out

Still I feel that this could be better handled...

RFC: Batch consume API

@edenhill: After discussing with @EVODelavega in Issue #13 I've come here to ask for input. I'm not very familiar with librdkafka, but I understand kafka.Consumer.Poll uses the channel-based API, which wraps a call to C._rk_queue_poll internally, to receive a single message.

What do you think about exposing a lower-level call to librdkafka poll functions, without the channel-based helper, which can return a slice of messages for the user to handle?

Our use-case is to fetch a batch of messages, then process sequentially in a loop, committing only upon reaching some condition.

I think the problematic part is that package users will then have to manage Kafka status messages. That could be deferred to be handled by the user, or helpers could be offered.

This looks promising however Opaque field is odd type and doesn't work?

in a kafka.Message you have the Opaque field this is very critical to my application I am trying to replace Sarama with this wrapper around librdkafka in a POC at very high scale. However the type of Opaque is *interface{} this feels wrong, interface{} should be all that is needed and is how Sarama does it's Metadata field for the same purpose.

installation error on Ubuntu

Hello,

Installed librdkafka on my Ubuntu 14.04 machine.

$ldconfig -p | grep librdkafka
librdkafka.so.1 (libc6,x86-64) => /usr/local/lib/librdkafka.so.1
librdkafka++.so.1 (libc6,x86-64) => /usr/local/lib/librdkafka++.so.1

Tried to do 'go get' of confluent-kafka-go, but getting the following error. Please help resolve.
If you need librdkafka version number, please let me know the command to find that out.

thanks,
Buvana

$go get gopkg.in/confluentinc/confluent-kafka-go.v0/kafka

gopkg.in/confluentinc/confluent-kafka-go.v0/kafka

37: error: 'rd_kafka_conf_set_events' undeclared (first use in this function)
37: error: 'rd_kafka_event_destroy' undeclared (first use in this function)
38: error: 'rd_kafka_queue_get_main' undeclared (first use in this function)
38: error: 'rd_kafka_event_error_string' undeclared (first use in this function)
38: error: 'rd_kafka_event_type' undeclared (first use in this function)
38: error: 'RD_KAFKA_EVENT_OFFSET_COMMIT' undeclared (first use in this function)
38: error: 'rd_kafka_event_error' undeclared (first use in this function)
38: error: 'rd_kafka_event_name' undeclared (first use in this function)
38: error: 'rd_kafka_commit_queue' undeclared (first use in this function)
38: error: 'rd_kafka_queue_get_consumer' undeclared (first use in this function)
38: error: 'rd_kafka_event_topic_partition_list' undeclared (first use in this function)
38: error: 'rd_kafka_queue_poll' undeclared (first use in this function)
38: error: 'RD_KAFKA_EVENT_REBALANCE' undeclared (first use in this function)

Slow consumer flow

Hello Magnus,

I am building a kafka consumer using the Go bindings, I have some questions regarding
multiplexing control and data events in a single channel (Events()).

Setup:

  • Slow consumer, one job at a time (a job can take up to 10min to process)
  • I am using the channel based consumer API
  • Manual commits per message

Our processing function is directly hooked to a consumer.Events() loop along with handlers
for assigned & revoked partitions. So, since the call can block for up to 10min the Events()
channel buffer might fill up. Is this something to worry about?

Also, what is going to happen if e.g. a partition is revoked or assigned but that event is stuck behind a set of kafka.Messages? Those control messages are going to be delivered after several minutes.

So, basically my question is what's the correct way to implement a slow-consumer.

Thank you,
Chris

Linux builds fail

I can succesfully use the library in Mac, but if I try to use it in Linux I get the following error when bulding:

~/go/src/github.com/confluentinc/confluent-kafka-go/examples/go-kafkacat# go build
# github.com/confluentinc/confluent-kafka-go/kafka
could not determine kind of name for C.RD_KAFKA_RESP_ERR_INVALID_CONFIG
could not determine kind of name for C.RD_KAFKA_RESP_ERR_INVALID_PARTITIONS
could not determine kind of name for C.RD_KAFKA_RESP_ERR_INVALID_REPLICATION_FACTOR
could not determine kind of name for C.RD_KAFKA_RESP_ERR_INVALID_REPLICA_ASSIGNMENT
could not determine kind of name for C.RD_KAFKA_RESP_ERR_INVALID_REQUEST
could not determine kind of name for C.RD_KAFKA_RESP_ERR_NOT_CONTROLLER
could not determine kind of name for C.RD_KAFKA_RESP_ERR_TOPIC_ALREADY_EXISTS
could not determine kind of name for C.RD_KAFKA_RESP_ERR_UNSUPPORTED_FOR_MESSAGE_FORMAT
could not determine kind of name for C.RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE
could not determine kind of name for C.RD_KAFKA_RESP_ERR__WAIT_CACHE

I have tried in two linux machines one debian and one latest ubuntu 17.04
Go version 1.8
Gcc 6.3.0

Also tried in a clean docker based on ubuntu 17.04 and Go1.8 and exactly the same results.

Cannot Install on OSX Sierra 10.12.3

brew install librdkafka --> librdkafka 0.9.3 is installed

go version go1.7.4 darwin/amd64

$ go get github.com/confluentinc/confluent-kafka-go
package github.com/confluentinc/confluent-kafka-go: no buildable Go source files in /Users/mhaan/AppDev/go/src/github.com/confluentinc/confluent-kafka-go
$ cd kafka
$ go install
github.com/confluentinc/confluent-kafka-go/kafka
./producer.go:46:9: error: use of undeclared identifier 'RD_KAFKA_V_END'
RD_KAFKA_V_END);
^
1 error generated.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.