wvanbergen / kafka Goto Github PK
View Code? Open in Web Editor NEWLoad-balancing, resuming Kafka consumer for go, backed by Zookeeper.
License: MIT License
Load-balancing, resuming Kafka consumer for go, backed by Zookeeper.
License: MIT License
Hi, thanks for sharing the great library!
I have a case as the following, the consumer consumes the message asynchronously, so the offset may not be committed serially. What is the best practice of handling offsets to guarantee the offset committing correctly?
for {
select {
case m, ok := <- consumer.Messages():
if ok {
go func(m *sarama.ConsumerMessage) {
exportMessage := worker.Ingest(m)
// TODO: Commit the offsets
}(m)
}
}
}
hi,anyone
I create a consumergroup to consumer some topics, and I print offset of each topic, and find some topic consumption delay.
Meanwhile, i create a consumergroup with "github.com/Shopify/sarama", and find topic is normal consumption.
Anyone can help me fix the issue?
tips:
**normal consumption source code**
**consumption delay source code**
kafka_0.8.2_group.go.zip
delay_log.txt
I run Kafka in Docker use images: wurstmeister/kafka:0.9.0.1
docker-compose.yml looks like:
### zookeeper #########################################
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
networks:
- backend
### kafka #########################################
kafka:
image: wurstmeister/kafka:0.9.0.1
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_PORT: 9092
KAFKA_ADVERTISED_HOST_NAME: "172.23.0.8"
KAFKA_CREATE_TOPICS: "KtRoomMessage:1:1,KtRoomMergeMessage:1:1,KtRoomDelMessage:1:1,KtRoomEditMessage:1:1,KtMessageFeed:1:1,KafkaPushsTopic:1:1"
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
# volumes:
# - ../database/kafka:/tmp/kafka-logs
# - ../database/zookeeper:/tmp/zookeeper
# volumes:
# - /var/run/docker.sock:/var/run/docker.sock
depends_on:
- zookeeper
networks:
backend:
ipv4_address: 172.23.0.8
### Networks Setup ############################################
networks:
frontend:
driver: "bridge"
backend:
driver: "bridge"
ipam:
config:
- subnet: 172.23.0.0/24
it works fine when use kafka-console-consumer.sh
➜ docker git:(master) ✗ docker-compose exec kafka bash
bash-4.3# cd /opt/kafka_2.11-0.9.0.1/
bash-4.3# ./bin/kafka-console-consumer.sh --zookeeper zookeeper:2181 --topic KtRoomMessage
{"id":1395866053756864,"room_id":1371649991298042,"uid":1800329207,"username":"","room_type":"vip","message_type":"normal","content":"sdf","image_urls":"","date":"2017-04-11","status":1,"ip":"172.23.0.14","created_at":"2017-04-11 16:30:30","updated_at":"2017-04-11 16:30:30","subRoomId":1371649991298042}
and works fine use github.com/wvanbergen/kafka/tools/consoleconsumer
root@101fe7f7fb4d:/go/src/github.com/wvanbergen/kafka/tools/consoleconsumer# ./consoleconsumer -brokers kafka:9092 -topic KtRoomMessage
Offset: 0
Key:
Value: {"id":1395864139704472,"room_id":1371649991298042,"uid":1800329207,"username":"","room_type":"vip","message_type":"normal","content":"sdf","image_urls":"","date":"2017-04-11","status":1,"ip":"172.23.0.14","created_at":"2017-04-11 15:58:36","updated_at":"2017-04-11 15:58:36","subRoomId":1371649991298042}
but not work use consumergroup
root@101fe7f7fb4d:/go/src/github.com/wvanbergen/kafka/tools/consoleconsumer# consumergroup -zookeeper zookeeper:2181 -topics KtRoomMessage -gro dada
2017/04/11 16:33:09 Connected to 172.23.0.4:2181
2017/04/11 16:33:09 Authenticated: id=97772968327577636, timeout=4000
2017/04/11 16:33:09 Re-submitting `0` credentials after reconnect
[Sarama] 2017/04/11 16:33:09 Initializing new client
[Sarama] 2017/04/11 16:33:09 client/metadata fetching metadata for all topics from broker 172.23.0.8:9092
[Sarama] 2017/04/11 16:33:09 Connected to broker at 172.23.0.8:9092 (unregistered)
[Sarama] 2017/04/11 16:33:09 client/brokers registered new broker #1001 at 172.23.0.8:9092
[Sarama] 2017/04/11 16:33:09 Successfully initialized new client
[Sarama] 2017/04/11 16:33:09 [dada/fcad8b192b4e] Consumer instance registered (101fe7f7fb4d:1ef0f883-7b41-4f63-a970-fcad8b192b4e).
[Sarama] 2017/04/11 16:33:09 [dada/fcad8b192b4e] Currently registered consumers: 1
[Sarama] 2017/04/11 16:33:09 [dada/fcad8b192b4e] KtRoomMessage :: Started topic consumer
[Sarama] 2017/04/11 16:33:09 [dada/fcad8b192b4e] KtRoomMessage :: Claiming 1 of 1 partitions
[Sarama] 2017/04/11 16:33:09 [dada/fcad8b192b4e] KtRoomMessage/0 :: Partition consumer starting at offset 8.
[Sarama] 2017/04/11 16:33:09 Connected to broker at 172.23.0.8:9092 (registered as #1001)
[Sarama] 2017/04/11 16:33:09 consumer/broker/1001 added subscription to KtRoomMessage/0
consumergroup can not receive any message when i push message to Topic KtRoomMessage
i create lots of ConsumerGroup with JoinConsumerGroup, and fetch msg from kafka by
for event := range consumer.Messages() {
// Process event
log.Println(string(event.Value))
eventCount += 1
// Ack event
consumer.CommitUpto(event)
}
when i run my program, consumer.Messages() return nil. the fowllowing my code
`type KafkaConsumer struct {
Topic string
Kafka //kafka configure
GroupConsumer *consumergroup.ConsumerGroup
Msgs chan *sarama.ConsumerMessage //提供给外部程序使用
Exit chan bool
}
func (k Kafka) NewConsumer(topic string) (*KafkaConsumer, error) {
config := consumergroup.NewConfig()
config.Offsets.Initial = Convert(k.Where)
config.Offsets.ProcessingTimeout = 10 * time.Second
var zookeeperNodes []string
zookeeperNodes, config.Zookeeper.Chroot = kazoo.ParseConnectionString(topic)
k.Alltopic = append(k.Alltopic, topic)
groupconsumer, err := consumergroup.JoinConsumerGroup(k.Groupname, zookeeperNodes, k.Zookeeper, config)
if err != nil {
return nil, err
}
msgchan := make(chan *sarama.ConsumerMessage, 30)
ret := &KafkaConsumer{Kafka: k, GroupConsumer: groupconsumer,
Msgs: msgchan, Exit: make(chan bool, 1), Topic: topic}
log.Printf("new a zookeeper consumer, info:%s, topic:%s, groupconsumer:%+v\n", k.String(), topic, ret)
return ret, nil
}
func (k *KafkaConsumer) Close() error {
if k.GroupConsumer != nil {
if err := k.GroupConsumer.Close(); err != nil {
log.Printf("stop a zookeeper consumer fail. err:%s hostinfo:%s\n",
err.Error(), k.String())
return err
} else {
log.Printf("stop a zookeeper consumer success, hostinfo:%s\n", k.String())
}
}
k.Exit <- true
return nil
}
func (k *KafkaConsumer) String() string {
return fmt.Sprintf("%s input_topic:%s, groupconsumer:%+v, msgchan=%+v", k.Kafka.String(),
k.Topic, k.GroupConsumer, k.Msgs)
}
func (k *KafkaConsumer) Dispatcher(mylog *Log) {
ticker := time.NewTicker(time.Second * 10)
log.Printf("topic:%s start a dispatcher\n", k.Topic)
for {
select {
case msg := <-k.GroupConsumer.Messages():
if msg != nil && msg.Value != nil && len(msg.Value) > 0 {
k.Msgs <- msg
k.GroupConsumer.CommitUpto(msg)
mylog.Log.Debugf("conumser_topic:%+v, msg_topic:%+v, msg_len=%d, send to recevie chan",
k.Alltopic, msg.Topic, len(msg.Value))
} else {
mylog.Log.Debugf("conumser_topic:%+v, msg_topic:%+v,error format msg:%+v",
k.Alltopic, k.Topic, msg)
}
case err := <-k.GroupConsumer.Errors():
mylog.Log.Debugf("conumser_topic:%+v, msg_topic:%+v, fetch msg err,%+v", k.Alltopic, k.Topic, err)
case <-k.Exit:
log.Printf("conumser_topic:%+v, msg_topic:%+v, dispatcher exit\n", k.Alltopic, k.Topic)
ticker.Stop()
return
case <-ticker.C:
mylog.Log.Debugf("conumser_topic:%+v, msg_topic:%+v, dispatcher ticker 10s, Consumer:%+v, chan_len:%d",
k.Alltopic, k.Topic, k)
}
}
}`
how to fix it?
best wishes.
I run the example, and find it very slow. I'm not sure where the bottleneck is.
Hi,
I've noticed that the error returned by ReleasePartition
is not handled here:
https://github.com/wvanbergen/kafka/blob/master/consumergroup/consumer_group.go#L357
defer cg.instance.ReleasePartition(topic, partition)
If an error is returned (which I guess means that the partition wasn't released) could it prevent another consumer to claim it?
Thanks,
Benoit
I have met a problem:
Code written using wvanbergen/kafka consumes messages successfully at first, but after receiving 872 messages, the consuming loop ended. I am sure all 872 messages are processed successfully by another goroutine.
Then I restarted my program, still no message could be consumed.
While at this time, on kafka manager, the consumer offset stops and never grows, while the log size and total lag num are constantly growing.
If I use kafka consuming test script kafka-console-consumer.sh, with the same zk/consumer group, all messages could be successfully consumed.
So I could just consider it as a bug...
BTW, the code is displayed as following (The "received %d msg" log printed at first but now never prints again, and the log "Message terminated." never ever printed even once):
BTW again, the same thing happened last week, and I stopped my program at that time. Today I ran my program again, at first everything was right, but then stopped again.
`consumer, consumerErr := consumergroup.JoinConsumerGroup(consumerGroupName, kafkaTopics, p.zooServer, config)
if consumerErr != nil {
log.Fatalln(consumerErr)
}
go func() {
for err := range consumer.Errors() {
log.Printf("receive consumer error: %s\n", err)
if consumer.Closed() {
consumer, _ = consumergroup.JoinConsumerGroup(consumerGroupName, kafkaTopics, p.zooServer, config)
}
}
}()
eventCount := 0
offsets := make(map[string]map[int32]int64)
go func() {
log.Infof("Consumer group: close = %t, %s", consumer.Closed(), consumer)
for message := range consumer.Messages() {
if offsets[message.Topic] == nil {
offsets[message.Topic] = make(map[int32]int64)
}
eventCount += 1
if offsets[message.Topic][message.Partition] != 0 && offsets[message.Topic][message.Partition] != message.Offset-1 {
log.Printf("unexpected offset on %s:%d. Expected %d, found %d, diff %d.\n", message.Topic, message.Partition, offsets[message.Topic][message.Partition]+1, message.Offset, message.Offset-offsets[message.Topic][message.Partition]+1)
}
log.Infof("received %d msg", eventCount)
p.recordRecv()
p.msgChan <- message.Value
offsets[message.Topic][message.Partition] = message.Offset
consumer.CommitUpto(message)
}
log.Infoln("Message terminated.")
}()`
Using banch refactor (#72) and after applying PR @ samuel/go-zookeeper#84 to my code tree, my kafkaconsumer still fails to consume data when an IO timeout occurs on the connection to Zookeeper.
To reproduce you can prevent the consumer from accessing zookeeper for a few seconds:
root@root# iptables -A OUTPUT -p tcp -m tcp --dport 2181 -j DROP # Add rule to block outgoing traffic to zookeeper
... Wait a few seconds...
root@root# iptables -D OUTPUT -p tcp -m tcp --dport 2181 -j DROP # Remove rule
Log output (I changed consumerManager.run() so it runs every 10s):
15:19:51 Recv loop terminated: err=read tcp 10.10.12.62:2181: i/o timeout
15:19:51 Send loop terminated: err=<nil>
15:19:51 [instance=f8565f67ec4a] Failed to watch subscription: zk: connection closed. Trying again in 1 second...
15:19:53 Failed to connect to 10.10.12.62:2181: dial tcp 10.10.12.62:2181: i/o timeout
15:19:53 [instance=f8565f67ec4a] Failed to watch subscription: zk: could not connect to a server. Trying again in 1 second...
..
15:20:26 Connected to 10.10.12.62:2181
15:20:26 Authenticated: id=94641789739226871, timeout=4000
15:20:26 [instance=f8565f67ec4a] Currently, 0 instances are registered, to consume 2 partitions in total.
15:20:26 [instance=f8565f67ec4a] This instance is assigned to consume 0 partitions, and is currently consuming 2 partitions.
[Sarama] 2015/10/08 15:20:26 consumer/broker/0 closed dead subscription to rqueue.out.bs_msg_in/0
[Sarama] 2015/10/08 15:20:26 consumer/broker/0 closed dead subscription to rqueue.out.bs_msg_in/1
15:20:26 [instance=f8565f67ec4a partition=rqueue.out.bs_msg_in/0] Offset 579 has been processed. Continuing shutdown...
15:20:26 [instance=f8565f67ec4a partition=rqueue.out.bs_msg_in/1] FAILED to release partition: Cannot release partition: it is not claimed by this instance
15:20:26 [instance=f8565f67ec4a partition=rqueue.out.bs_msg_in/0] FAILED to release partition: Cannot release partition: it is not claimed by this instance
15:20:36 [instance=f8565f67ec4a] Currently, 0 instances are registered, to consume 2 partitions in total.
15:20:36 [instance=f8565f67ec4a] This instance is assigned to consume 0 partitions, and is currently consuming 0 partitions.
15:20:46 [instance=f8565f67ec4a] Currently, 0 instances are registered, to consume 2 partitions in total.
15:20:46 [instance=f8565f67ec4a] This instance is assigned to consume 0 partitions, and is currently consuming 0 partitions.
I noticed you have this comment in partition_consumer.go:
// This is shitty and reallt needs reworking.
p.stream.Close()
if err := p.setSaramaConsumer(0); err != nil {
return err
}
I just had a runtime panic that this that Close() line
"panic: runtime error: close of closed channel"
before I dig in, since you already know there are issues I wanted to get your thoughts on what you might have in mind for a fix or suggestions for places to start poking around
thanks,
Jim
Currently, the library attempts to split partitions among consumers in a completely even manner (each consumer gets the same number of partitions. For example, if you have 32 partitions and 9 consumers, 8 consumers get assigned partitions at 4 partitions ea., and one is just left hanging. Is that intentional - it seems like a pretty easy fix.
Hello,
I'm trying to write small consumer group application using several workers.
Almost everything is working correctly, but when in supplied example (consumergroup.go) I try to change logic from:
for message := range consumer.Messages() {
if offsets[message.Topic] == nil {
offsets[message.Topic] = make(map[int32]int64)
}
to:
for {
select {
case message := <-consumer.Messages():
if offsets[message.Topic] == nil {
offsets[message.Topic] = make(map[int32]int64)
}
When I try to stop worker with ^C, I'm getting nil pointer dereference:
[Sarama] 2015/08/16 16:59:58 [consumer_example.go/4932df2e5698] ip-rep-input :: Stopped topic consumer
[Sarama] 2015/08/16 16:59:58 [consumer_example.go/4932df2e5698] Deregistered consumer instance Greyhound.local:9b307c47-696a-4419-9b93-4932df2e5698.
[Sarama] 2015/08/16 16:59:58 Closing Client
panic: runtime error: invalid memory address or nil pointer dereference
[signal 0xb code=0x1 addr=0x0 pc=0x2d9d]
goroutine 1 [running]:
main.main()
/Users/eric/sync/Work/go/src/cybertonica.com/irb/cg/cg.go:75 +0xbcd
What am I doing wrong?
The full modified source code is:
package main
import (
"flag"
"log"
"os"
"os/signal"
"strings"
"time"
"github.com/Shopify/sarama"
"github.com/wvanbergen/kafka/consumergroup"
"github.com/wvanbergen/kazoo-go"
)
const (
defaultKafkaTopics = "test_topic"
defaultConsumerGroup = "consumer_example.go"
)
var (
consumerGroup = flag.String("group", defaultConsumerGroup, "The name of the consumer group, used for coordination and load balancing")
kafkaTopicsCSV = flag.String("topics", defaultKafkaTopics, "The comma-separated list of topics to consume")
zookeeper = flag.String("zookeeper", "", "A comma-separated Zookeeper connection string (e.g. `zookeeper1.local:2181,zookeeper2.local:2181,zookeeper3.local:2181`)")
zookeeperNodes []string
)
func init() {
sarama.Logger = log.New(os.Stdout, "[Sarama] ", log.LstdFlags)
}
func main() {
flag.Parse()
if *zookeeper == "" {
flag.PrintDefaults()
os.Exit(1)
}
config := consumergroup.NewConfig()
config.Offsets.Initial = sarama.OffsetNewest
config.Offsets.ProcessingTimeout = 10 * time.Second
zookeeperNodes, config.Zookeeper.Chroot = kazoo.ParseConnectionString(*zookeeper)
kafkaTopics := strings.Split(*kafkaTopicsCSV, ",")
consumer, consumerErr := consumergroup.JoinConsumerGroup(*consumerGroup, kafkaTopics, zookeeperNodes, config)
if consumerErr != nil {
log.Fatalln(consumerErr)
}
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
go func() {
<-c
if err := consumer.Close(); err != nil {
sarama.Logger.Println("Error closing the consumer", err)
}
}()
go func() {
for err := range consumer.Errors() {
log.Println(err)
}
}()
eventCount := 0
offsets := make(map[string]map[int32]int64)
for {
select {
case message := <-consumer.Messages():
if offsets[message.Topic] == nil {
offsets[message.Topic] = make(map[int32]int64)
}
eventCount++
if offsets[message.Topic][message.Partition] != 0 && offsets[message.Topic][message.Partition] != message.Offset-1 {
log.Printf("Unexpected offset on %s:%d. Expected %d, found %d, diff %d.\n", message.Topic, message.Partition, offsets[message.Topic][message.Partition]+1, message.Offset, message.Offset-offsets[message.Topic][message.Partition]+1)
}
// Simulate processing time
time.Sleep(10 * time.Millisecond)
offsets[message.Topic][message.Partition] = message.Offset
consumer.CommitUpto(message)
}
}
log.Printf("Processed %d events.", eventCount)
log.Printf("%+v", offsets)
}
As of Kafka 0.9, users are encouraged to use Kafka broker addresses for auto-discovery, rather than the ZooKeeper connection details that the kafka.NewConsumerGroup()
constructor expects.
Will wvanbergen/kafka support a more modern client constructor?
Which versions of Kafka has wvanbergen been tested against?
While there are public methods for initializing the offset manager and its config, JoinConsumerGroup()
does not make it possible to set this. It sets internally to a private member, offsetManager
. I don't see a way to set VerboseLogging
in the consumer group API.
Given the following test program:
package main
import (
"fmt"
"log"
"os"
"time"
"github.com/wvanbergen/kafka/consumergroup"
"gopkg.in/Shopify/sarama.v1"
)
func main() {
sarama.Logger = log.New(os.Stdout, "[Sarama]", log.LstdFlags)
config := consumergroup.NewConfig()
config.Offsets.Initial = sarama.OffsetOldest
config.Offsets.ProcessingTimeout = 10 * time.Second
consumer, consumerErr := consumergroup.JoinConsumerGroup("FOO_TEST_CAN_GO", []string{"testproducercango"}, []string{"127.0.0.1"}, config)
if consumerErr != nil {
log.Fatalln(consumerErr)
}
defer consumer.Close()
go func() {
for err := range consumer.Errors() {
log.Println(err)
}
}()
eventCount := 0
messProcessed := make(map[string]int)
StreamLoop:
for {
select {
case <-time.After(time.Second * 10):
break StreamLoop
case mess := <-consumer.Messages():
fmt.Printf("Got event from stream. Topic: %v, Partition: %v, Offset: %v, Mess: %v \n", mess.Topic, mess.Partition, mess.Offset, string(mess.Value))
eventCount += 1
// Simulate processing time
time.Sleep(2 * time.Second)
consumer.CommitUpto(mess)
messProcessed[string(mess.Value)] += 1
}
}
log.Printf("Processed %d events.", eventCount)
log.Printf("%+v", messProcessed)
}
With topic testproducercango
having 20 messages called bladiebla1...bladiebla20
in 2 partitions.
I get the following output when I start this program 4
times in parallel.
Pid 0:
[Sarama]2015/04/07 17:37:39 Initializing new client [Sarama]2015/04/07 17:37:39 Fetching metadata for all topics from broker localhost:9092 [Sarama]2015/04/07 17:37:39 Connected to broker localhost:9092 [Sarama]2015/04/07 17:37:39 Registered new broker #0 at localhost:9092 [Sarama]2015/04/07 17:37:39 Successfully initialized new client [Sarama]2015/04/07 17:37:39 [FOO_TEST_CAN_GO/52c1ac213840] Consumer instance registered (me-user:f3080828-f8b8-4e21-a3c4-52c1ac213840). [Sarama]2015/04/07 17:37:39 [FOO_TEST_CAN_GO/52c1ac213840] Currently registered consumers: 1 [Sarama]2015/04/07 17:37:39 [FOO_TEST_CAN_GO/52c1ac213840] testproducercango :: Started topic consumer [Sarama]2015/04/07 17:37:39 [FOO_TEST_CAN_GO/52c1ac213840] testproducercango :: Claiming 2 of 2 partitions [Sarama]2015/04/07 17:37:39 [FOO_TEST_CAN_GO/52c1ac213840] testproducercango/0 :: Partition consumer starting at the oldest available offset. [Sarama]2015/04/07 17:37:39 Connected to broker localhost:9092 [Sarama]2015/04/07 17:37:39 [FOO_TEST_CAN_GO/52c1ac213840] testproducercango/1 :: Partition consumer starting at the oldest available offset. Got event from stream. Topic: testproducercango, Partition: 0, Offset: 0, Mess: bladiebla2 Got event from stream. Topic: testproducercango, Partition: 0, Offset: 1, Mess: bladiebla3 Got event from stream. Topic: testproducercango, Partition: 0, Offset: 2, Mess: bladiebla4 [Sarama]2015/04/07 17:37:45 [FOO_TEST_CAN_GO/52c1ac213840] Triggering rebalance due to consumer list change [Sarama]2015/04/07 17:37:45 [FOO_TEST_CAN_GO/52c1ac213840] testproducercango/0 :: Stopping partition consumer at offset 6 [Sarama]2015/04/07 17:37:45 [FOO_TEST_CAN_GO/52c1ac213840] testproducercango/0 :: Last processed offset: 1. Waiting up to 10s for another 5 messages to process... [Sarama]2015/04/07 17:37:45 [FOO_TEST_CAN_GO/52c1ac213840] testproducercango/1 :: Stopping partition consumer at offset 12 [Sarama]2015/04/07 17:37:45 [FOO_TEST_CAN_GO/52c1ac213840] testproducercango/1 :: Last processed offset: -1. Waiting up to 10s for another 13 messages to process... [Sarama]2015/04/07 17:37:55 [FOO_TEST_CAN_GO/52c1ac213840] testproducercango/1 :: TIMEOUT waiting for offset 12. Last committed offset: -1 [Sarama]2015/04/07 17:37:55 [FOO_TEST_CAN_GO/52c1ac213840] testproducercango :: Stopped topic consumer [Sarama]2015/04/07 17:37:55 [FOO_TEST_CAN_GO/52c1ac213840] FAILED closing the offset manager: Not all offsets were committed before shutdown was completed! [Sarama]2015/04/07 17:37:55 [FOO_TEST_CAN_GO/52c1ac213840] Deregistered consumer instance me-user:f3080828-f8b8-4e21-a3c4-52c1ac213840. [Sarama]2015/04/07 17:37:55 Closing Client [Sarama]2015/04/07 17:37:55 Closed connection to broker localhost:9092 [Sarama]2015/04/07 17:37:55 Closed connection to broker localhost:9092 panic: runtime error: invalid memory address or nil pointer dereference [signal 0xb code=0x1 addr=0x0 pc=0x47a613] goroutine 1 [running]: github.com/wvanbergen/kafka/consumergroup.(*partitionOffsetTracker).markAsProcessed(0x0, 0x2, 0x0) /home/me/Development/Dev/go/src/github.com/wvanbergen/kafka/consumergroup/offset_manager.go:223 +0x123 github.com/wvanbergen/kafka/consumergroup.(*zookeeperOffsetManager).MarkAsProcessed(0xc20802c040, 0x6582b0, 0x11, 0x0, 0x2, 0xc208381e00) /home/me/Development/Dev/go/src/github.com/wvanbergen/kafka/consumergroup/offset_manager.go:152 +0x12b github.com/wvanbergen/kafka/consumergroup.(*ConsumerGroup).CommitUpto(0xc208056210, 0xc208041590, 0x0, 0x0) /home/me/Development/Dev/go/src/github.com/wvanbergen/kafka/consumergroup/consumer_group.go:232 +0xb1 main.main() /home/me/Development/Dev/go/src/test/foo/foo.go:44 +0xc6f goroutine 5 [semacquire]: sync.(*WaitGroup).Wait(0xc20801e140) /usr/local/go/src/sync/waitgroup.go:132 +0x169 github.com/samuel/go-zookeeper/zk.(*Conn).loop(0xc2080320d0) /home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:227 +0x76d github.com/samuel/go-zookeeper/zk.func·001() /home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:145 +0x2c created by github.com/samuel/go-zookeeper/zk.ConnectWithDialer /home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:149 +0x44f goroutine 7 [runnable]: github.com/samuel/go-zookeeper/zk.(*Conn).sendLoop(0xc2080320d0, 0x7f9731560bb8, 0xc208038038, 0xc2080302a0, 0x0, 0x0) /home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:412 +0xce9 github.com/samuel/go-zookeeper/zk.func·002() /home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:212 +0x5a created by github.com/samuel/go-zookeeper/zk.(*Conn).loop /home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:215 +0x680 goroutine 17 [syscall, locked to thread]: runtime.goexit() /usr/local/go/src/runtime/asm_amd64.s:2232 +0x1
Pid 1:
[Sarama]2015/04/07 17:37:45 Initializing new client [Sarama]2015/04/07 17:37:45 Fetching metadata for all topics from broker localhost:9092 [Sarama]2015/04/07 17:37:45 Connected to broker localhost:9092 [Sarama]2015/04/07 17:37:45 Registered new broker #0 at localhost:9092 [Sarama]2015/04/07 17:37:45 Successfully initialized new client [Sarama]2015/04/07 17:37:45 [FOO_TEST_CAN_GO/509180e0075d] Consumer instance registered (me-user:7c3f576d-063f-4399-b346-509180e0075d). [Sarama]2015/04/07 17:37:45 [FOO_TEST_CAN_GO/509180e0075d] Currently registered consumers: 2 [Sarama]2015/04/07 17:37:45 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Started topic consumer [Sarama]2015/04/07 17:37:45 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Claiming 1 of 2 partitions [Sarama]2015/04/07 17:37:45 [FOO_TEST_CAN_GO/509180e0075d] testproducercango/0 :: Partition consumer starting at offset 2. [Sarama]2015/04/07 17:37:45 Connected to broker localhost:9092 Got event from stream. Topic: testproducercango, Partition: 0, Offset: 2, Mess: bladiebla4 Got event from stream. Topic: testproducercango, Partition: 0, Offset: 3, Mess: bladiebla8 Got event from stream. Topic: testproducercango, Partition: 0, Offset: 4, Mess: bladiebla12 Got event from stream. Topic: testproducercango, Partition: 0, Offset: 5, Mess: bladiebla14 [Sarama]2015/04/07 17:37:52 [FOO_TEST_CAN_GO/509180e0075d] Triggering rebalance due to consumer list change [Sarama]2015/04/07 17:37:52 [FOO_TEST_CAN_GO/509180e0075d] testproducercango/0 :: Stopping partition consumer at offset 6 [Sarama]2015/04/07 17:37:52 [FOO_TEST_CAN_GO/509180e0075d] testproducercango/0 :: Last processed offset: 4. Waiting up to 10s for another 2 messages to process... Got event from stream. Topic: testproducercango, Partition: 0, Offset: 6, Mess: bladiebla19 [Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Stopped topic consumer [Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/509180e0075d] Currently registered consumers: 3 [Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Started topic consumer [Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Claiming 0 of 2 partitions [Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Stopped topic consumer [Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/509180e0075d] Triggering rebalance due to consumer list change [Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/509180e0075d] Currently registered consumers: 2 [Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Started topic consumer [Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Claiming 1 of 2 partitions [Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/509180e0075d] testproducercango/1 :: Partition consumer starting at offset 3. Got event from stream. Topic: testproducercango, Partition: 1, Offset: 3, Mess: bladiebla6 [Sarama]2015/04/07 17:38:03 [FOO_TEST_CAN_GO/509180e0075d] Triggering rebalance due to consumer list change [Sarama]2015/04/07 17:38:03 [FOO_TEST_CAN_GO/509180e0075d] testproducercango/1 :: Stopping partition consumer at offset 12 [Sarama]2015/04/07 17:38:03 [FOO_TEST_CAN_GO/509180e0075d] testproducercango/1 :: Last processed offset: 2. Waiting up to 10s for another 10 messages to process... Got event from stream. Topic: testproducercango, Partition: 1, Offset: 4, Mess: bladiebla7 Got event from stream. Topic: testproducercango, Partition: 1, Offset: 5, Mess: bladiebla9 Got event from stream. Topic: testproducercango, Partition: 1, Offset: 6, Mess: bladiebla10 Got event from stream. Topic: testproducercango, Partition: 1, Offset: 7, Mess: bladiebla11 Got event from stream. Topic: testproducercango, Partition: 1, Offset: 8, Mess: bladiebla13 [Sarama]2015/04/07 17:38:13 [FOO_TEST_CAN_GO/509180e0075d] testproducercango/1 :: TIMEOUT waiting for offset 12. Last committed offset: 3 [Sarama]2015/04/07 17:38:14 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Stopped topic consumer [Sarama]2015/04/07 17:38:14 [FOO_TEST_CAN_GO/509180e0075d] Currently registered consumers: 1 [Sarama]2015/04/07 17:38:14 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Started topic consumer [Sarama]2015/04/07 17:38:14 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Claiming 2 of 2 partitions [Sarama]2015/04/07 17:38:14 [FOO_TEST_CAN_GO/509180e0075d] testproducercango/0 :: Partition consumer starting at offset 7. [Sarama]2015/04/07 17:38:14 [FOO_TEST_CAN_GO/509180e0075d] testproducercango/1 :: Partition consumer starting at offset 4. Got event from stream. Topic: testproducercango, Partition: 1, Offset: 9, Mess: bladiebla15 Got event from stream. Topic: testproducercango, Partition: 1, Offset: 10, Mess: bladiebla16 Got event from stream. Topic: testproducercango, Partition: 1, Offset: 11, Mess: bladiebla17 Got event from stream. Topic: testproducercango, Partition: 1, Offset: 12, Mess: bladiebla18 Got event from stream. Topic: testproducercango, Partition: 1, Offset: 4, Mess: bladiebla7 Got event from stream. Topic: testproducercango, Partition: 1, Offset: 5, Mess: bladiebla9 Got event from stream. Topic: testproducercango, Partition: 1, Offset: 6, Mess: bladiebla10 Got event from stream. Topic: testproducercango, Partition: 1, Offset: 7, Mess: bladiebla11 Got event from stream. Topic: testproducercango, Partition: 1, Offset: 8, Mess: bladiebla13 Got event from stream. Topic: testproducercango, Partition: 1, Offset: 9, Mess: bladiebla15 Got event from stream. Topic: testproducercango, Partition: 1, Offset: 10, Mess: bladiebla16 Got event from stream. Topic: testproducercango, Partition: 1, Offset: 11, Mess: bladiebla17 Got event from stream. Topic: testproducercango, Partition: 1, Offset: 12, Mess: bladiebla18 [Sarama]2015/04/07 17:38:50 [FOO_TEST_CAN_GO/509180e0075d] testproducercango/0 :: Stopping partition consumer at offset -1 [Sarama]2015/04/07 17:38:50 [FOO_TEST_CAN_GO/509180e0075d] testproducercango/1 :: Stopping partition consumer at offset 12 2015/04/07 17:38:50 Processed 24 events. 2015/04/07 17:38:50 map[bladiebla13:2 bladiebla17:2 bladiebla6:1 bladiebla7:2 bladiebla16:2 bladiebla12:1 bladiebla8:1 bladiebla19:1 bladiebla11:2 bladiebla15:2 bladiebla4:1 bladiebla9:2 bladiebla10:2 bladiebla18:2 bladiebla14:1] [Sarama]2015/04/07 17:38:51 [FOO_TEST_CAN_GO/509180e0075d] testproducercango :: Stopped topic consumer [Sarama]2015/04/07 17:38:51 [FOO_TEST_CAN_GO/509180e0075d] Deregistered consumer instance me-user:7c3f576d-063f-4399-b346-509180e0075d. [Sarama]2015/04/07 17:38:51 Closing Client [Sarama]2015/04/07 17:38:51 Closed connection to broker localhost:9092 [Sarama]2015/04/07 17:38:51 Closed connection to broker localhost:9092
Pid 2:
[Sarama]2015/04/07 17:37:52 Initializing new client [Sarama]2015/04/07 17:37:52 Fetching metadata for all topics from broker localhost:9092 [Sarama]2015/04/07 17:37:52 Connected to broker localhost:9092 [Sarama]2015/04/07 17:37:52 Registered new broker #0 at localhost:9092 [Sarama]2015/04/07 17:37:52 Successfully initialized new client [Sarama]2015/04/07 17:37:52 [FOO_TEST_CAN_GO/14644cac47d7] Consumer instance registered (me-user:1f1f7e20-dbff-4c01-89d6-14644cac47d7). [Sarama]2015/04/07 17:37:52 [FOO_TEST_CAN_GO/14644cac47d7] Currently registered consumers: 3 [Sarama]2015/04/07 17:37:52 [FOO_TEST_CAN_GO/14644cac47d7] testproducercango :: Started topic consumer [Sarama]2015/04/07 17:37:52 [FOO_TEST_CAN_GO/14644cac47d7] testproducercango :: Claiming 1 of 2 partitions [Sarama]2015/04/07 17:37:55 [FOO_TEST_CAN_GO/14644cac47d7] Triggering rebalance due to consumer list change [Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/14644cac47d7] testproducercango/0 :: Partition consumer starting at offset 7. [Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/14644cac47d7] testproducercango/0 :: Stopping partition consumer at offset -1 [Sarama]2015/04/07 17:37:56 Connected to broker localhost:9092 [Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/14644cac47d7] testproducercango :: Stopped topic consumer [Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/14644cac47d7] Currently registered consumers: 3 [Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/14644cac47d7] testproducercango :: Started topic consumer [Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/14644cac47d7] testproducercango :: Claiming 1 of 2 partitions [Sarama]2015/04/07 17:37:56 [FOO_TEST_CAN_GO/14644cac47d7] testproducercango/0 :: Partition consumer starting at offset 7. [Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/14644cac47d7] testproducercango/0 :: Stopping partition consumer at offset -1 2015/04/07 17:38:02 Processed 0 events. 2015/04/07 17:38:02 map[] [Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/14644cac47d7] testproducercango :: Stopped topic consumer [Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/14644cac47d7] Deregistered consumer instance me-user:1f1f7e20-dbff-4c01-89d6-14644cac47d7. [Sarama]2015/04/07 17:38:02 Closing Client [Sarama]2015/04/07 17:38:02 Closed connection to broker localhost:9092 [Sarama]2015/04/07 17:38:02 Closed connection to broker localhost:9092
Pid 3:
[Sarama]2015/04/07 17:37:55 Initializing new client [Sarama]2015/04/07 17:37:55 Fetching metadata for all topics from broker localhost:9092 [Sarama]2015/04/07 17:37:55 Connected to broker localhost:9092 [Sarama]2015/04/07 17:37:55 Registered new broker #0 at localhost:9092 [Sarama]2015/04/07 17:37:55 Successfully initialized new client [Sarama]2015/04/07 17:37:55 [FOO_TEST_CAN_GO/d91a72837cd0] Consumer instance registered (me-user:6ebed8bf-285e-4f9b-9007-d91a72837cd0). [Sarama]2015/04/07 17:37:55 [FOO_TEST_CAN_GO/d91a72837cd0] Currently registered consumers: 3 [Sarama]2015/04/07 17:37:55 [FOO_TEST_CAN_GO/d91a72837cd0] testproducercango :: Started topic consumer [Sarama]2015/04/07 17:37:55 [FOO_TEST_CAN_GO/d91a72837cd0] testproducercango :: Claiming 1 of 2 partitions [Sarama]2015/04/07 17:37:55 [FOO_TEST_CAN_GO/d91a72837cd0] testproducercango/1 :: Partition consumer starting at the oldest available offset. [Sarama]2015/04/07 17:37:55 Connected to broker localhost:9092 Got event from stream. Topic: testproducercango, Partition: 1, Offset: 0, Mess: bladiebla0 Got event from stream. Topic: testproducercango, Partition: 1, Offset: 1, Mess: bladiebla1 Got event from stream. Topic: testproducercango, Partition: 1, Offset: 2, Mess: bladiebla5 Got event from stream. Topic: testproducercango, Partition: 1, Offset: 3, Mess: bladiebla6 [Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/d91a72837cd0] Triggering rebalance due to consumer list change [Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/d91a72837cd0] testproducercango/1 :: Stopping partition consumer at offset 12 [Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/d91a72837cd0] testproducercango/1 :: Last processed offset: 2. Waiting up to 10s for another 10 messages to process... [Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/d91a72837cd0] testproducercango :: Stopped topic consumer [Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/d91a72837cd0] Currently registered consumers: 2 [Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/d91a72837cd0] testproducercango :: Started topic consumer [Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/d91a72837cd0] testproducercango :: Claiming 1 of 2 partitions [Sarama]2015/04/07 17:38:02 [FOO_TEST_CAN_GO/d91a72837cd0] testproducercango/0 :: Partition consumer starting at offset 7. [Sarama]2015/04/07 17:38:03 [FOO_TEST_CAN_GO/d91a72837cd0] testproducercango/0 :: Stopping partition consumer at offset -1 [Sarama]2015/04/07 17:38:03 [FOO_TEST_CAN_GO/d91a72837cd0] testproducercango :: Stopped topic consumer [Sarama]2015/04/07 17:38:03 [FOO_TEST_CAN_GO/d91a72837cd0] Deregistered consumer instance me-user:6ebed8bf-285e-4f9b-9007-d91a72837cd0. [Sarama]2015/04/07 17:38:03 Closing Client [Sarama]2015/04/07 17:38:03 Closed connection to broker localhost:9092 [Sarama]2015/04/07 17:38:03 Closed connection to broker localhost:9092 panic: runtime error: invalid memory address or nil pointer dereference [signal 0xb code=0x1 addr=0x0 pc=0x47a613] goroutine 1 [running]: github.com/wvanbergen/kafka/consumergroup.(*partitionOffsetTracker).markAsProcessed(0x0, 0x3, 0x0) /home/me/Development/Dev/go/src/github.com/wvanbergen/kafka/consumergroup/offset_manager.go:223 +0x123 github.com/wvanbergen/kafka/consumergroup.(*zookeeperOffsetManager).MarkAsProcessed(0xc20802c040, 0x6582b0, 0x11, 0x1, 0x3, 0xc208381700) /home/me/Development/Dev/go/src/github.com/wvanbergen/kafka/consumergroup/offset_manager.go:152 +0x12b github.com/wvanbergen/kafka/consumergroup.(*ConsumerGroup).CommitUpto(0xc208056210, 0xc208040fa0, 0x0, 0x0) /home/me/Development/Dev/go/src/github.com/wvanbergen/kafka/consumergroup/consumer_group.go:232 +0xb1 main.main() /home/me/Development/Dev/go/src/test/foo/foo.go:44 +0xc6f goroutine 5 [semacquire]: sync.(*WaitGroup).Wait(0xc20801e140) /usr/local/go/src/sync/waitgroup.go:132 +0x169 github.com/samuel/go-zookeeper/zk.(*Conn).loop(0xc2080320d0) /home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:227 +0x76d github.com/samuel/go-zookeeper/zk.func·001() /home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:145 +0x2c created by github.com/samuel/go-zookeeper/zk.ConnectWithDialer /home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:149 +0x44f goroutine 7 [runnable]: github.com/samuel/go-zookeeper/zk.(*Conn).sendLoop(0xc2080320d0, 0x7f7bc8842bb8, 0xc208038038, 0xc2080302a0, 0x0, 0x0) /home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:412 +0xce9 github.com/samuel/go-zookeeper/zk.func·002() /home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:212 +0x5a created by github.com/samuel/go-zookeeper/zk.(*Conn).loop /home/me/Development/Dev/go/src/github.com/samuel/go-zookeeper/zk/conn.go:215 +0x680 goroutine 17 [syscall, locked to thread]: runtime.goexit() /usr/local/go/src/runtime/asm_amd64.s:2232 +0x1
Issues in bold. For now I see two:
markAsProcessed
acts on already closed partitionI'm trying to debug an issue where it looks like
1.) The Zookeeper connection dies/timesout
2015/09/14 10:39:45 read tcp 10.129.196.49:2181: i/o timeout
2015/09/14 10:39:53 read tcp 10.129.196.11:2181: i/o timeout
2015/09/14 10:40:01 read tcp 10.129.196.55:2181: i/o timeout
2015/09/14 10:40:09 read tcp 10.129.196.49:2181: i/o timeout
2015/09/14 10:49:12 read tcp 10.129.196.11:2181: i/o timeout
2015/09/14 10:49:13 read tcp 10.129.196.55:2181: i/o timeout
2015/09/14 10:49:13 Failed to set previous watches: zk: connection closed
2015/09/14 10:49:14 read tcp 10.129.196.49:2181: i/o timeout
2015/09/14 10:49:14 Failed to set previous watches: zk: connection closed
2015/09/14 10:49:14 read tcp 10.129.196.11:2181: i/o timeout
2015/09/14 10:49:14 Failed to set previous watches: zk: connection closed
2.) A rebalance does not occur (on any of the other nodes) so the partitions are left without a consumer.
I'm not sure if this is an issue with ZK or consumer group. Also I'm using consumergroup from e236a65 with my PRs added (meaning I don't have any other fixes applied - I'm not sure if my problem is solved with those fixes).
when cg close, it should commit the offset before quit
The case messages <- message
here at https://github.com/wvanbergen/kafka/blob/master/consumergroup/consumer_group.go#L413
appears to succeed, but there are situations I see where message == nil. This cause a problem when message.Offset is accessed in the case arm. I have not tried to further characterize what makes message == nil.
I'm using go 1.5.1 on ubuntu 14. For my present set of tests, I'm also compiling and runing -race, although I have not run anything to rule in/rule out these situations
panic: runtime error: invalid memory address or nil pointer dereference
[signal 0xb code=0x1 addr=0x48 pc=0x5472e0]
goroutine 167 [running]:
github.com/wvanbergen/kafka/consumergroup.(*ConsumerGroup).partitionConsumer(0xc8201745a0, 0xc82013b180, 0x11, 0x0, 0xc8204e0420, 0xc8204e0480, 0xc8206a4b50, 0xc8206709c0)
/go/src/xxxx/Godeps/_workspace/src/github.com/wvanbergen/kafka/consumergroup/consumer_group.go:435 +0x1580
created by github.com/wvanbergen/kafka/consumergroup.(*ConsumerGroup).topicConsumer
/go/src/xxxx/Godeps/_workspace/src/github.com/wvanbergen/kafka/consumergroup/consumer_group.go:341 +0xbd6
{
"ImportPath": "github.com/wvanbergen/kafka/consumergroup",
"Rev": "1ff806bb203e104e04d49189220d267d37e46758"
},
{
"ImportPath": "github.com/Shopify/sarama",
"Comment": "v1.8.0-49-g89bd629",
"Rev": "89bd629b50e1a69c40d40ded1c82a767b4859442"
},
kafka 0.9.0.0
The process panic every 10 minutes.
Hi
I have met with a bug: I have 32 partitions in one topic, and two consumers using the same group in this topic. When consumer A starts, it will consume all 32 partitions, as expected. But when the second consumer B starts, B will try to consume 16 partitions which have not been yield from consumer A yet.
So I just put some retry code to fix this bug, waiting for author's official fix. Somebody who has also met with this bug, can use this code temporarily.
In github.com/wvanbergen/kafka/consumergroup/consumer_group.go, line 339, function partitionConsumer:
change code from:
err := cg.instance.ClaimPartition(topic, partition)
to
retry_sec := 10
var err error
for i:=0; i< retry_sec; i++ {
err = cg.instance.ClaimPartition(topic, partition)
if err == nil {
break
}
cg.Logf("%s/%d :: Retry to Claim the partition : %s\n" ,topic, partition, partition)
time.Sleep(1*time.Second)
}
Thanks.
I start multiple processes which join the same consumer group.
If there are processes that are currently consuming kafka messages while a new consumer process is spawned, often some of the existing processes panic and crash with a null pointer exception (NPE) when I try to commit the latest offset (CommitUpto
).
The NPE occurs because the partition has been deleted from the zookeeperOffsetManager in the following function:
func (zom *zookeeperOffsetManager) MarkAsProcessed(topic string, partition int32, offset int64) bool {
zom.l.RLock()
defer zom.l.RUnlock()
return zom.offsets[topic][partition].markAsProcessed(offset)
}
My local work around has been to add a check and short circuit:
func (zom *zookeeperOffsetManager) MarkAsProcessed(topic string, partition int32, offset int64) bool {
zom.l.RLock()
defer zom.l.RUnlock()
currentOffset := zom.offsets[topic][partition]
if currentOffset == nil {
return false
}
return currentOffset.markAsProcessed(offset)
}
When I start new process which joins same consumer group is failing with following message.
I think this is because If there are processes that are not released partition while new consumer is spawned.
[Sarama] 2015/05/23 14:28:43 [logstash/2b5ec6db90f5] Currently registered consumers: 3
[Sarama] 2015/05/23 14:28:43 [logstash/2b5ec6db90f5] logs :: Started topic consumer
[Sarama] 2015/05/23 14:28:43 [logstash/2b5ec6db90f5] logs :: Claiming 7 of 20 partitions
[Sarama] 2015/05/23 14:28:43 [logstash/2b5ec6db90f5] logs/0 :: FAILED to claim the partition: Cannot claim partition: it is already claimed by another instance
[Sarama] 2015/05/23 14:28:43 [logstash/2b5ec6db90f5] logs/9 :: FAILED to claim the partition: Cannot claim partition: it is already claimed by another instance
[Sarama] 2015/05/23 14:28:43 [logstash/2b5ec6db90f5] logs/6 :: FAILED to claim the partition: Cannot claim partition: it is already claimed by another instance
[Sarama] 2015/05/23 14:28:43 [logstash/2b5ec6db90f5] logs/15 :: FAILED to claim the partition: Cannot claim partition: it is already claimed by another instance
[Sarama] 2015/05/23 14:28:43 [logstash/2b5ec6db90f5] logs/3 :: FAILED to claim the partition: Cannot claim partition: it is already claimed by another instance
[Sarama] 2015/05/23 14:28:43 [logstash/2b5ec6db90f5] logs/18 :: FAILED to claim the partition: Cannot claim partition: it is already claimed by another instance
As workaround I have added a time wait here.
func (cg *ConsumerGroup) topicListConsumer(topics []string) {
for {
select {
case <-cg.stopper:
return
default:
}
I used to run a consumer( just like examples/main.go) on one node(Node A), but the CPU usage is too high, so I copy my program to another node(Node B). Now I have two consumer(the consume the same topic) with same configuration.
But, here comes the problem. I find the second node(Node B)'s CPU usage is very high just as consumer runs on one node. And the first node(Node A)'s CPU usage is almost zero.
In my opinion, the two node will consume the same topic and their CPU usage should be the same.
But the result is beyond my expectations. Can you point out is there anything wrong with me?
I will be appreciative. Thank you anyway.
Is this supported, multiple consumers of a same consumer group run concurrently in a single go process? I run into following err, any ideas? Thanks!
panic: runtime error: index out of range
goroutine 4395258 [running]:
panic(0x7e1ec0, 0xc820010090)
/usr/local/go/src/runtime/panic.go:464 +0x3e6
github.com/wvanbergen/kazoo-go.(_ConsumergroupInstanceList).Less(0xc8df7bc480, 0x3, 0xffffffffffffffff, 0x1)
:10 +0x124
sort.doPivot(0x7fa8cba22338, 0xc8df7bc480, 0x3, 0x18, 0x9, 0x3)
/usr/local/go/src/sort/sort.go:128 +0x27b
sort.quickSort(0x7fa8cba22338, 0xc8df7bc480, 0x3, 0x18, 0x8)
/usr/local/go/src/sort/sort.go:195 +0xa3
sort.Sort(0x7fa8cba22338, 0xc8df7bc480)
/usr/local/go/src/sort/sort.go:229 +0x74
github.com/wvanbergen/kafka/consumergroup.dividePartitionsBetweenConsumers(0xc8f8824540, 0x18, 0x18, 0xc8bac5fc00, 0x3c, 0x3c, 0x0)
/Users/lhan/Dev/gopath/src/github.com/wvanbergen/kafka/consumergroup/utils.go:40 +0x192
github.com/wvanbergen/kafka/consumergroup.(_ConsumerGroup).topicConsumer(0xc82479a090, 0x887910, 0xb, 0xc82649d080, 0xc82649d0e0, 0xc86102b740)
/Users/lhan/Dev/gopath/src/github.com/wvanbergen/kafka/consumergroup/consumer_group.go:333 +0x850
created by github.com/wvanbergen/kafka/consumergroup.(*ConsumerGroup).topicListConsumer
/Users/lhan/Dev/gopath/src/github.com/wvanbergen/kafka/consumergroup/consumer_group.go:271 +0x40a
I tried the example, but failed with a working kafka. The topic and zookeeper configurations are right.
-group="click1": The name of the consumer group, used for coordination and load balancing
-topics="bjcnc_nginx_nginx_newmysql0": The comma-separated list of topics to consume
-zookeeper="10.16.13.133:2181,10.16.13.134:2181,10.16.13.135:2181,10.16.13.136:2181,10.16.13.137:2181/realtime-monitor-newmysql": A comma-separated Zookeeper connection string (e.g. zookeeper1.local:2181,zookeeper2.local:2181,zookeeper3.local:2181
)
2016/10/22 17:30:38 Before JoinConsuerGroup, consumerGroup: click1 , nodes: [10.16.13.133:2181 10.16.13.134:2181 10.16.13.135:2181 10.16.13.136:2181 10.16.13.137:2181] , root: /realtime-monitor-newmysql
2016/10/22 17:30:38 Connected to 10.16.13.137:2181
2016/10/22 17:30:38 Authenticated: id=384865011391368701, timeout=4000
2016/10/22 17:30:38 Re-submitting 0
credentials after reconnect
2016/10/22 17:30:38 Recv loop terminated: err=EOF
2016/10/22 17:30:38 zk: node does not exist
Here's my code.
`package main
import (
"flag"
"log"
"os"
"os/signal"
"strings"
"time"
"fmt"
"github.com/Shopify/sarama"
"github.com/wvanbergen/kafka/consumergroup"
"github.com/wvanbergen/kazoo-go"
)
const (
DefaultKafkaTopics = "kafka_performance_test"
DefaultConsumerGroup = "testsarama1"
MAX_COUNT = 1000000
)
var (
consumerGroup = flag.String("group", DefaultConsumerGroup, "The name of the consumer group, used for coordination and load balancing")
kafkaTopicsCSV = flag.String("topics", DefaultKafkaTopics, "The comma-separated list of topics to consume")
zookeeper = flag.String("zookeeper", "openlive-kafka-online001-bjlt.qiyi.virtual:2181,openlive-kafka-online002-bjlt.qiyi.virtual:2181,openlive-kafka-online003-bjlt.qiyi.virtual:2181,openlive-kafka-online004-bjlt.qiyi.virtual:2181,openlive-kafka-online005-bjlt.qiyi.virtual:2181", "A comma-separated Zookeeper connection string (e.g. zookeeper1.local:2181,zookeeper2.local:2181,zookeeper3.local:2181
)")
zookeeperNodes []string
)
func init() {
sarama.Logger = log.New(os.Stdout, "[Sarama] ", log.LstdFlags)
}
func main() {
flag.Parse()
if *zookeeper == "" {
flag.PrintDefaults()
os.Exit(1)
}
config := consumergroup.NewConfig()
config.Offsets.Initial = sarama.OffsetNewest
config.Offsets.ProcessingTimeout = 10 * time.Second
zookeeperNodes, config.Zookeeper.Chroot = kazoo.ParseConnectionString(*zookeeper)
kafkaTopics := strings.Split(*kafkaTopicsCSV, ",")
consumer, consumerErr := consumergroup.JoinConsumerGroup(*consumerGroup, kafkaTopics, zookeeperNodes, config)
if consumerErr != nil {
log.Fatalln(consumerErr)
}
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
go func() {
<-c
if err := consumer.Close(); err != nil {
sarama.Logger.Println("Error closing the consumer", err)
}
}()
go func() {
for err := range consumer.Errors() {
log.Println(err)
}
}()
eventCount := 0
offsets := make(map[string]map[int32]int64)
start_time := time.Now().UnixNano() / 1000000
fmt.Println(len(consumer.Messages()))
for message := range consumer.Messages() {
fmt.Println(string(message.Value))
if offsets[message.Topic] == nil {
offsets[message.Topic] = make(map[int32]int64)
}
if eventCount > MAX_COUNT {
break
}
fmt.Println("eventCount:", eventCount)
eventCount += 1
if offsets[message.Topic][message.Partition] != 0 && offsets[message.Topic][message.Partition] != message.Offset-1 {
log.Printf("Unexpected offset on %s:%d. Expected %d, found %d, diff %d.\n", message.Topic, message.Partition, offsets[message.Topic][message.Partition]+1, message.Offset, message.Offset-offsets[message.Topic][message.Partition]+1)
}
// Simulate processing time
// time.Sleep(10 * time.Millisecond)
offsets[message.Topic][message.Partition] = message.Offset
consumer.CommitUpto(message)
}
fmt.Println("total_count: ", eventCount)
end_time := time.Now().UnixNano() / 1000000
process_time := end_time - start_time
fmt.Println("process_time: ", process_time)
}
and my output follows:
2018/01/09 17:15:05 Connected to 10.13.44.23:2181
2018/01/09 17:15:05 Authenticated: id=170991064662003625, timeout=6000
[Sarama] 2018/01/09 17:15:05 Initializing new client
[Sarama] 2018/01/09 17:15:05 client/metadata fetching metadata for all topics from broker openlive-kafka-online005-bjlt.qiyi.virtual:9092
[Sarama] 2018/01/09 17:15:05 Connected to broker at openlive-kafka-online005-bjlt.qiyi.virtual:9092 (unregistered)
[Sarama] 2018/01/09 17:15:05 client/brokers registered new broker #5 at openlive-kafka-online005-bjlt.qiyi.virtual:9092
[Sarama] 2018/01/09 17:15:05 client/brokers registered new broker #10 at openlive-kafka-online014-bjlt.qiyi.virtual:9092
[Sarama] 2018/01/09 17:15:05 client/brokers registered new broker #1 at openlive-kafka-online001-bjlt.qiyi.virtual:9092
[Sarama] 2018/01/09 17:15:05 client/brokers registered new broker #6 at openlive-kafka-online010-bjlt.qiyi.virtual:9092
[Sarama] 2018/01/09 17:15:05 client/brokers registered new broker #9 at openlive-kafka-online013-bjlt.qiyi.virtual:9092
[Sarama] 2018/01/09 17:15:05 client/brokers registered new broker #2 at openlive-kafka-online002-bjlt.qiyi.virtual:9092
[Sarama] 2018/01/09 17:15:05 client/brokers registered new broker #7 at openlive-kafka-online011-bjlt.qiyi.virtual:9092
[Sarama] 2018/01/09 17:15:05 client/brokers registered new broker #3 at openlive-kafka-online003-bjlt.qiyi.virtual:9092
[Sarama] 2018/01/09 17:15:05 client/brokers registered new broker #8 at openlive-kafka-online012-bjlt.qiyi.virtual:9092
[Sarama] 2018/01/09 17:15:05 client/brokers registered new broker #4 at openlive-kafka-online004-bjlt.qiyi.virtual:9092
[Sarama] 2018/01/09 17:15:05 Successfully initialized new client
[Sarama] 2018/01/09 17:15:05 [testsarama1/d2a251a23aba] Consumer instance registered (hcdn-others-worker-dev100-bjlt.qiyi.virtual:90c7388b-c001-45bc-8dc9-d2a251a23aba).
len(consumer.Messages()): 0
[Sarama] 2018/01/09 17:15:05 [testsarama1/d2a251a23aba] Currently registered consumers: 1
[Sarama] 2018/01/09 17:15:05 [testsarama1/d2a251a23aba] kafka_performance_test :: Started topic consumer
[Sarama] 2018/01/09 17:15:05 [testsarama1/d2a251a23aba] kafka_performance_test :: Claiming 10 of 10 partitions
[Sarama] 2018/01/09 17:15:06 [testsarama1/d2a251a23aba] kafka_performance_test/1 :: Partition consumer listening for new messages only.
[Sarama] 2018/01/09 17:15:06 [testsarama1/d2a251a23aba] kafka_performance_test/6 :: Partition consumer listening for new messages only.
[Sarama] 2018/01/09 17:15:06 Connected to broker at openlive-kafka-online005-bjlt.qiyi.virtual:9092 (registered as #5)
[Sarama] 2018/01/09 17:15:06 [testsarama1/d2a251a23aba] kafka_performance_test/0 :: Partition consumer listening for new messages only.
[Sarama] 2018/01/09 17:15:06 [testsarama1/d2a251a23aba] kafka_performance_test/3 :: Partition consumer listening for new messages only.
[Sarama] 2018/01/09 17:15:06 Connected to broker at openlive-kafka-online014-bjlt.qiyi.virtual:9092 (registered as #10)
[Sarama] 2018/01/09 17:15:06 Connected to broker at openlive-kafka-online004-bjlt.qiyi.virtual:9092 (registered as #4)
[Sarama] 2018/01/09 17:15:06 [testsarama1/d2a251a23aba] kafka_performance_test/8 :: Partition consumer listening for new messages only.
[Sarama] 2018/01/09 17:15:06 consumer/broker/5 added subscription to kafka_performance_test/1
[Sarama] 2018/01/09 17:15:06 [testsarama1/d2a251a23aba] kafka_performance_test/9 :: Partition consumer listening for new messages only.
[Sarama] 2018/01/09 17:15:06 Connected to broker at openlive-kafka-online002-bjlt.qiyi.virtual:9092 (registered as #2)
[Sarama] 2018/01/09 17:15:06 Connected to broker at openlive-kafka-online011-bjlt.qiyi.virtual:9092 (registered as #7)
[Sarama] 2018/01/09 17:15:06 consumer/broker/10 added subscription to kafka_performance_test/6
[Sarama] 2018/01/09 17:15:06 consumer/broker/4 added subscription to kafka_performance_test/0
[Sarama] 2018/01/09 17:15:06 Connected to broker at openlive-kafka-online003-bjlt.qiyi.virtual:9092 (registered as #3)
[Sarama] 2018/01/09 17:15:06 [testsarama1/d2a251a23aba] kafka_performance_test/5 :: Partition consumer listening for new messages only.
[Sarama] 2018/01/09 17:15:06 consumer/broker/2 added subscription to kafka_performance_test/8
[Sarama] 2018/01/09 17:15:06 [testsarama1/d2a251a23aba] kafka_performance_test/4 :: Partition consumer listening for new messages only.
[Sarama] 2018/01/09 17:15:06 consumer/broker/7 added subscription to kafka_performance_test/3
[Sarama] 2018/01/09 17:15:06 [testsarama1/d2a251a23aba] kafka_performance_test/7 :: Partition consumer listening for new messages only.
[Sarama] 2018/01/09 17:15:06 Connected to broker at openlive-kafka-online013-bjlt.qiyi.virtual:9092 (registered as #9)
[Sarama] 2018/01/09 17:15:06 consumer/broker/3 added subscription to kafka_performance_test/9
[Sarama] 2018/01/09 17:15:06 [testsarama1/d2a251a23aba] kafka_performance_test/2 :: Partition consumer listening for new messages only.
[Sarama] 2018/01/09 17:15:06 Connected to broker at openlive-kafka-online001-bjlt.qiyi.virtual:9092 (registered as #1)
[Sarama] 2018/01/09 17:15:06 Connected to broker at openlive-kafka-online012-bjlt.qiyi.virtual:9092 (registered as #8)
[Sarama] 2018/01/09 17:15:06 consumer/broker/9 added subscription to kafka_performance_test/5
[Sarama] 2018/01/09 17:15:06 Connected to broker at openlive-kafka-online010-bjlt.qiyi.virtual:9092 (registered as #6)
[Sarama] 2018/01/09 17:15:06 consumer/broker/1 added subscription to kafka_performance_test/7
[Sarama] 2018/01/09 17:15:06 consumer/broker/8 added subscription to kafka_performance_test/4
[Sarama] 2018/01/09 17:15:06 consumer/broker/6 added subscription to kafka_performance_test/2`
You can see i received anything from kafka, and the length of consumer.Messages() is 0, i don't know the mistakes where i take,please do me a favour. Thanks!
And i find that if i change a new Consumergroup , i encounter the following mistake, but i don't know how to fix it.
[Sarama] 2018/01/09 17:39:28 [testsarama3/77ab52d3afbb] Consumergroup testsarama3
does not yet exists, creating...
[Sarama] 2018/01/09 17:39:28 [testsarama3/77ab52d3afbb] Consumer instance registered (hcdn-others-worker-dev100-bjlt.qiyi.virtual:0cfbc8c2-5203-4d3d-8283-77ab52d3afbb).
len(consumer.Messages()): 0
[Sarama] 2018/01/09 17:39:28 [testsarama3/77ab52d3afbb] Currently registered consumers: 1
[Sarama] 2018/01/09 17:39:28 [testsarama3/77ab52d3afbb] kafka_performance_test :: Started topic consumer
[Sarama] 2018/01/09 17:39:28 [testsarama3/77ab52d3afbb] kafka_performance_test :: Claiming 10 of 10 partitions
[Sarama] 2018/01/09 17:39:29 [testsarama3/77ab52d3afbb] kafka_performance_test/5 :: FAILED to claim partition on attempt 1 of 12; retrying in 1 second. Error: zk: node already exists
[Sarama] 2018/01/09 17:39:29 [testsarama3/77ab52d3afbb] kafka_performance_test/3 :: FAILED to claim partition on attempt 1 of 12; retrying in 1 second. Error: zk: node already exists
[Sarama] 2018/01/09 17:39:29 [testsarama3/77ab52d3afbb] kafka_performance_test/7 :: FAILED to claim partition on attempt 1 of 12; retrying in 1 second. Error: zk: node already exists
[Sarama] 2018/01/09 17:39:29 [testsarama3/77ab52d3afbb] kafka_performance_test/6 :: FAILED to claim partition on attempt 1 of 12; retrying in 1 second. Error: zk: node already exists
[Sarama] 2018/01/09 17:39:29 [testsarama3/77ab52d3afbb] kafka_performance_test/2 :: FAILED to claim partition on attempt 1 of 12; retrying in 1 second. Error: zk: node already exists
[Sarama] 2018/01/09 17:39:29 [testsarama3/77ab52d3afbb] kafka_performance_test/8 :: FAILED to claim partition on attempt 1 of 12; retrying in 1 second. Error: zk: node already exists
[Sarama] 2018/01/09 17:39:29 [testsarama3/77ab52d3afbb] kafka_performance_test/1 :: FAILED to claim partition on attempt 1 of 12; retrying in 1 second. Error: zk: node already exists
[Sarama] 2018/01/09 17:39:29 [testsarama3/77ab52d3afbb] kafka_performance_test/9 :: FAILED to claim partition on attempt 1 of 12; retrying in 1 second. Error: zk: node already exists
[Sarama] 2018/01/09 17:39:29 [testsarama3/77ab52d3afbb] kafka_performance_test/4 :: FAILED to claim partition on attempt 1 of 12; retrying in 1 second. Error: zk: node already exists
Reproduction Steps:
Stack dump:
[2015-09-18T20:21:24.991Z] [GEARD] [geard.go:50] [INFO] PANIC: runtime error: invalid memory address or nil pointer dereference
/usr/src/go/src/runtime/panic.go:387 (0x41a9a8)
gopanic: reflectcall(unsafe.Pointer(d.fn), deferArgs(d), uint32(d.siz), uint32(d.siz))
/usr/src/go/src/runtime/panic.go:42 (0x419cce)
panicmem: panic(memoryError)
/usr/src/go/src/runtime/sigpanic_unix.go:26 (0x420734)
sigpanic: panicmem()
/go/src/github.com/channelmeter/kafka/consumergroup/offset_manager.go:223 (0x759ef3)
(*partitionOffsetTracker).markAsProcessed: pot.l.Lock()
/go/src/github.com/channelmeter/kafka/consumergroup/offset_manager.go:152 (0x75942c)
(*zookeeperOffsetManager).MarkAsProcessed: return zom.offsets[topic][partition].markAsProcessed(offset)
/go/src/github.com/channelmeter/kafka/consumergroup/consumer_group.go:235 (0x755814)
(*ConsumerGroup).CommitUpto: cg.offsetManager.MarkAsProcessed(message.Topic, message.Partition, message.Offset)
/go/src/github.com/channelmeter/geard/harvester/kafka.go:83 (0x475b75)
(*kafkaHarvester).Ack: return rq.consumer.CommitUpto(msg)
/go/src/github.com/channelmeter/geard/geard.go:410 (0x4063e2)
func.012: harvestQueue.Ack(harvTask)
/usr/src/go/src/runtime/asm_amd64.s:401 (0x443aa5)
call16: CALLFN(·call16, 16)
/usr/src/go/src/runtime/panic.go:387 (0x41a9a8)
gopanic: reflectcall(unsafe.Pointer(d.fn), deferArgs(d), uint32(d.siz), uint32(d.siz))
/usr/src/go/src/runtime/panic.go:42 (0x419cce)
panicmem: panic(memoryError)
/usr/src/go/src/runtime/sigpanic_unix.go:26 (0x420734)
sigpanic: panicmem()
/go/src/github.com/channelmeter/kafka/consumergroup/offset_manager.go:223 (0x759ef3)
(*partitionOffsetTracker).markAsProcessed: pot.l.Lock()
/go/src/github.com/channelmeter/kafka/consumergroup/offset_manager.go:152 (0x75942c)
(*zookeeperOffsetManager).MarkAsProcessed: return zom.offsets[topic][partition].markAsProcessed(offset)
/go/src/github.com/channelmeter/kafka/consumergroup/consumer_group.go:235 (0x755814)
(*ConsumerGroup).CommitUpto: cg.offsetManager.MarkAsProcessed(message.Topic, message.Partition, message.Offset)
/go/src/github.com/channelmeter/geard/harvester/kafka.go:83 (0x475b75)
(*kafkaHarvester).Ack: return rq.consumer.CommitUpto(msg)
I'm guessing here, the consumer can no longer Commit the selected message because it no longer owns the partition. What should be done in this case? It might make most sense to drop the commit seeing how the message will be reprocessed anyways, or somehow let the application know that a rebalance occurred and your messages don't matter.
(Moved from #61)
Actually I was looking into this because I was having an issue where 2 of my nodes would stop accepting requests. I think this might be related - when my 9th node comes up one node gives up all its partitions, and another node tries to claim those partitions and fails:
It looks like this might be a data race?
Node A tries to grab 16, 17, 18, 19 and fails.
[Sarama] 2015/07/05 02:10:35 [geard/bacc9b9f50bb] Triggering rebalance due to consumer list change
[Sarama] 2015/07/05 02:10:35 [geard/bacc9b9f50bb] geard-user/14 :: Stopping partition consumer at offset -1
[Sarama] 2015/07/05 02:10:35 [geard/bacc9b9f50bb] geard-user/15 :: Stopping partition consumer at offset -1
[Sarama] 2015/07/05 02:10:35 [geard/bacc9b9f50bb] geard-user/12 :: Stopping partition consumer at offset 44
[Sarama] 2015/07/05 02:10:35 [geard/bacc9b9f50bb] geard-user/13 :: Stopping partition consumer at offset 43
[Sarama] 2015/07/05 02:10:35 consumer/broker/40770 closed dead subscription to geard-user/13
[Sarama] 2015/07/05 02:10:35 consumer/broker/40770 closed dead subscription to geard-user/14
[Sarama] 2015/07/05 02:10:35 consumer/broker/40770 closed dead subscription to geard-user/15
[Sarama] 2015/07/05 02:10:35 consumer/broker/40770 closed dead subscription to geard-user/12
[Sarama] 2015/07/05 02:10:35 [geard/bacc9b9f50bb] geard-user :: Stopped topic consumer
[Sarama] 2015/07/05 02:10:35 [geard/bacc9b9f50bb] Currently registered consumers: 9
[Sarama] 2015/07/05 02:10:35 [geard/bacc9b9f50bb] geard-user :: Started topic consumer
[Sarama] 2015/07/05 02:10:35 [geard/bacc9b9f50bb] geard-user :: Claiming 4 of 32 partitions
[Sarama] 2015/07/05 02:10:36 [geard/bacc9b9f50bb] geard-user/16 :: FAILED to claim the partition: Cannot claim partition: it is already claimed by another instance
[Sarama] 2015/07/05 02:10:36 [geard/bacc9b9f50bb] geard-user/17 :: FAILED to claim the partition: Cannot claim partition: it is already claimed by another instance
[Sarama] 2015/07/05 02:10:36 [geard/bacc9b9f50bb] geard-user/18 :: FAILED to claim the partition: Cannot claim partition: it is already claimed by another instance
[Sarama] 2015/07/05 02:10:36 [geard/bacc9b9f50bb] geard-user/19 :: FAILED to claim the partition: Cannot claim partition: it is already claimed by another instance
[Sarama] 2015/07/05 02:10:36 [geard/bacc9b9f50bb] geard-user :: Stopped topic consumer
[Sarama] 2015/07/05 02:18:46 client/metadata fetching metadata for all topics from broker 10.129.196.48:9092
Node B lets go of 16,17,18,19 possible after Node A tries to acquire it.
[Sarama] 2015/07/05 02:10:35 [geard/31c73a8faa4c] Triggering rebalance due to consumer list change
[Sarama] 2015/07/05 02:10:35 [geard/31c73a8faa4c] geard-user/16 :: Stopping partition consumer at offset -1
[Sarama] 2015/07/05 02:10:35 [geard/31c73a8faa4c] geard-user/17 :: Stopping partition consumer at offset -1
[Sarama] 2015/07/05 02:10:35 [geard/31c73a8faa4c] geard-user/18 :: Stopping partition consumer at offset -1
[Sarama] 2015/07/05 02:10:35 [geard/31c73a8faa4c] geard-user/19 :: Stopping partition consumer at offset 44
[Sarama] 2015/07/05 02:10:36 consumer/broker/40770 closed dead subscription to geard-user/18
[Sarama] 2015/07/05 02:10:36 consumer/broker/40770 closed dead subscription to geard-user/19
[Sarama] 2015/07/05 02:10:36 consumer/broker/40770 closed dead subscription to geard-user/16
[Sarama] 2015/07/05 02:10:36 consumer/broker/40770 closed dead subscription to geard-user/17
[Sarama] 2015/07/05 02:10:36 [geard/31c73a8faa4c] geard-user :: Stopped topic consumer
[Sarama] 2015/07/05 02:10:36 [geard/31c73a8faa4c] Currently registered consumers: 9
[Sarama] 2015/07/05 02:10:36 [geard/31c73a8faa4c] geard-user :: Started topic consumer
[Sarama] 2015/07/05 02:10:36 [geard/31c73a8faa4c] geard-user :: Claiming 4 of 32 partitions
[Sarama] 2015/07/05 02:10:36 [geard/31c73a8faa4c] geard-user/20 :: Partition consumer starting at offset 37.
[Sarama] 2015/07/05 02:10:36 [geard/31c73a8faa4c] geard-user/21 :: Partition consumer starting at offset 50.
[Sarama] 2015/07/05 02:10:36 consumer/broker/40770 added subscription to geard-user/20
[Sarama] 2015/07/05 02:10:36 [geard/31c73a8faa4c] geard-user/22 :: Partition consumer starting at offset 57.
[Sarama] 2015/07/05 02:10:36 [geard/31c73a8faa4c] geard-user/23 :: Partition consumer starting at offset 38.
[Sarama] 2015/07/05 02:10:36 consumer/broker/40770 added subscription to geard-user/21
[Sarama] 2015/07/05 02:10:37 consumer/broker/40770 added subscription to geard-user/23
[Sarama] 2015/07/05 02:10:37 consumer/broker/40770 added subscription to geard-user/22
[Sarama] 2015/07/05 02:18:42 client/metadata fetching metadata for all topics from broker 10.129.196.48:9092
It looks like the naive thing to do would be to possibly sleep for a second in topicListConsumer() - however using something other than Sleep
to solve this race condition might be better - unfortunately I don't yet have a great understanding of how consumergroups work.
Or, retry claiming a set number of times?
when I test the program , I just use Ctrl + C to killed it.
Is there a graceful way?
Or, What is the best practice to exit the program ?
First of all, thanks for writing and sharing this library! It looks like a promising
option for writing multi-node Kafka consumers in Go.
One possible issue I have is the ability to reliably process messages. I'd like
to be able guarantee that I don't commit an offset before I've positively
confirmed that all of the corresponded messages have been processed
successfully (i.e. not just passed into the processing code, but successfully
completed the associated application-level processing). IIUC the current
API doesn't support this. After you receive an event from the cg.Stream()
,
the corresponding offset could be committed by the client at any time in the
background.
Do I understand this correctly? If so, is reliable processing something that
you'd be interested in supporting in the high-level API?
Hi,
I find a problem that when I restart my consumers, a partition has no owner.
Consumer group will trigger rebalance when I stop a consumer. Its timeout in function finalizePartition is cg.config.Offsets.ProcessingTimeout, and retrial times is also cg.config.Offsets.ProcessingTimeout in rebalancing.
It should finish some logical processing which it was doing and running function finalizePartition before a consumer stop. This operating maybe spend time more than cg.config.Offset.ProcessingTimeout, so rebalancing will maybe fail, and some partitions have no owner.
To solve this problem, maybe we can add a goroutine to watch partition's owner, and it can also avoid some problem when partition numbers make changes such as kafka broker capacity expansion.
The issue I'm running into is on our local development boxes, I'd like to handle the situation where the partitions are not yet set up for a Kafka topic:
[Sarama] 2015/03/12 18:11:25 [consumer/a0103be9d1f2] Currently registered consumers: 1
[Sarama] 2015/03/12 18:11:25 [consumer/a0103be9d1f2] session :: Started topic consumer
[Sarama] 2015/03/12 18:11:25 [consumer/a0103be9d1f2] session :: FAILED to get list of partitions: zk: node does not exist
I believe that is saying that it's registering the consumer, but zookeeper doesn't know about the partitions for the topic. Unfortunately it seems to print this message to the stdout log and continue on. There don't appear to be any messages in any error channels or any errors coming back as this is returned form a go routine.
It would be great to detect if this scenario has occurred, and be able to handle the case where my consumers come up before topics and partitions are created (for local development with docker in particular).
Simplified it into the following scenario:
3 brokers. 1 topic with 4 partitions. 2 consumer instances to consume that topic. The start index of broker, partition and consumer is 0.
When c0 (consumer instance 0) calls utils.go#dividePartitionsBetweenConsumers(), the leaders are like:
After sort(by leader then by partition id), partitions is like
[
{p0 b0 xxxx} // {partition leader address}
{p3 b0 xxxx}
{p1 b1 xxxx}
{p2 b2 xxxx}
]
So c0 gets its myPartitions (to claim) like p0, p3.
Then p0 somehow change its leader to b2. The leaders are like:
And another consumer instance c1 calls utils.go#dividePartitionsBetweenConsumers().
After sort(by leader then by partition id), partitions is like
[
{p3 b0 xxxx}
{p1 b1 xxxx}
{p0 b2 xxxx}
{p2 b2 xxxx}
]
c1 gets its myPartitions (to claim) like p0, p2.
As a result, we have a condition that c0 tries to claim p0 and p3 while c1 tries to claim p0 and p2.
In utils.go, we sort the partitionLeader by leader firstly.
func (pls partitionLeaders) Less(i, j int) bool {
return pls[i].leader < pls[j].leader || (pls[i].leader == pls[j].leader && pls[i].id < pls[j].id)
}
When leader changes between 2 calls of dividePartitionsBetweenConsumers(), the result after sort is changed. I believe the root cause it here.
The latest sarama client has separated it's consumers to have two channels for Messages and Errors rather than the one Events channel for both. The latest consumergroup code has not been upgraded to that interface. Any plans on updating?
Hey!
Seeing that you are one of the biggest contributors to Shopify/sarama repository right now, should I expect this code to be merged at some point with sarama's main?
I'm using sarama at production and I'm in need to implement consumer groups.
I can probably even contribute to this repo a bit - just I would like to do it for repository having some perspectives for future support :).
I see there is another bsm repository, but for what I see in comments, You have no plans in getting this as a part of Sarama.
Also one more thing. Do you plan on getting possibility to pass already created consumers/partition consumers to join consumer group? This should make at least immediate states of upgrade from sarama and this lib, much easier.
Hey,
I just wanted to raise this issue for awareness sake. We use the latest version of Kafka, and your library has been working excellent for us for the past versions of kafka. In the newer version, sometimes even though the partition has an owner, the messages are not consumed from it. i tried restarting the program, assuming I was blocking on some error channel but still no avail. Would you say this is compatible with newer versions of kafka knowing sarama has no support for 0.11
In our use case for Kafka consumer groups, we need to store
Zoookeeper consumer state outside of the Zookeeper root, i.e.
in a Zookeeper "chroot". For example:
/kafka/consumers/%s/offsets/%s/%d
where here /kafka
is the chroot. Unfortunately I don't think this is possible
currently. Would you be open to introducing support for this in this library?
The underlying Zookeeper library doesn't have Zookeeper chroot support,
but since we wrap the connection already I think we could at it at the
level of this library.
I would be happy to write the patch for this if you were interested.
I'm not positive what the API design should be, but one possibility is
adding a Chroot
field to ConsumerGroupConfig
and threading it
through the various Zookeeper calls. Open to other approaches
though!
What does it means and what should I do?
We had an issue with connection from our consumers to kafka brokers. The issue lasted for couple of hours. The log from this time is below:
2017-11-11 00:14:13 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4394485987 for [KAFKA_TOPIC]/4!
2017-11-11 00:14:17 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4397716347 for [KAFKA_TOPIC]/2!
2017-11-11 00:14:21 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 38266406728 for [KAFKA_TOPIC]/1!
2017-11-11 00:14:25 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 38214590710 for [KAFKA_TOPIC]/0!
2017-11-11 00:14:29 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4380970705 for [KAFKA_TOPIC]/3!
2017-11-11 00:14:29 UTC | ERROR: kafka.go:518: Kafka consumer error: zk: could not connect to a server
2017-11-11 00:14:29 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] [KAFKA_TOPIC]/0 :: Stopping partition consumer at offset 38214590710
2017-11-11 00:14:29 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] [KAFKA_TOPIC]/4 :: Stopping partition consumer at offset 4394485987
2017-11-11 00:14:29 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] [KAFKA_TOPIC]/2 :: Stopping partition consumer at offset 4397716347
2017-11-11 00:14:29 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] [KAFKA_TOPIC]/1 :: Stopping partition consumer at offset 38266406728
2017-11-11 00:14:29 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] [KAFKA_TOPIC]/3 :: Stopping partition consumer at offset 4380970705
2017-11-11 00:14:30 UTC | consumer/broker/8 disconnecting due to error processing FetchRequest: read tcp 10.128.8.79:47780->10.1.3.144:9092: i/o timeout
2017-11-11 00:14:30 UTC | Closed connection to broker kafka8:9092
2017-11-11 00:14:30 UTC | kafka: error while consuming [KAFKA_TOPIC]/0: read tcp 10.128.8.79:47780->10.1.3.144:9092: i/o timeout
2017-11-11 00:14:30 UTC | consumer/broker/2 disconnecting due to error processing FetchRequest: read tcp 10.128.8.79:44170->10.1.3.236:9092: i/o timeout
2017-11-11 00:14:30 UTC | Closed connection to broker kafka2:9092
2017-11-11 00:14:30 UTC | kafka: error while consuming [KAFKA_TOPIC]/3: read tcp 10.128.8.79:44170->10.1.3.236:9092: i/o timeout
2017-11-11 00:14:30 UTC | kafka: error while consuming [KAFKA_TOPIC]/1: read tcp 10.128.8.79:44170->10.1.3.236:9092: i/o timeout
2017-11-11 00:14:30 UTC | consumer/broker/1 disconnecting due to error processing FetchRequest: read tcp 10.128.8.79:37892->10.1.3.235:9092: i/o timeout
2017-11-11 00:14:30 UTC | Closed connection to broker kafka1:9092
2017-11-11 00:14:30 UTC | kafka: error while consuming [KAFKA_TOPIC]/2: read tcp 10.128.8.79:37892->10.1.3.235:9092: i/o timeout
2017-11-11 00:14:30 UTC | consumer/broker/3 disconnecting due to error processing FetchRequest: read tcp 10.128.8.79:48368->10.1.3.195:9092: i/o timeout
2017-11-11 00:14:30 UTC | Closed connection to broker kafka3:9092
2017-11-11 00:14:30 UTC | kafka: error while consuming [KAFKA_TOPIC]/4: read tcp 10.128.8.79:48368->10.1.3.195:9092: i/o timeout
2017-11-11 00:14:32 UTC | consumer/[KAFKA_TOPIC]/0 finding new broker
2017-11-11 00:14:32 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka4:9092
2017-11-11 00:14:32 UTC | consumer/[KAFKA_TOPIC]/1 finding new broker
2017-11-11 00:14:32 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka4:9092
2017-11-11 00:14:32 UTC | consumer/[KAFKA_TOPIC]/3 finding new broker
2017-11-11 00:14:32 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka4:9092
2017-11-11 00:14:32 UTC | consumer/[KAFKA_TOPIC]/2 finding new broker
2017-11-11 00:14:32 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka4:9092
2017-11-11 00:14:32 UTC | consumer/[KAFKA_TOPIC]/4 finding new broker
2017-11-11 00:14:32 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka4:9092
2017-11-11 00:14:33 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4380970705 for [KAFKA_TOPIC]/3!
2017-11-11 00:14:33 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] [KAFKA_TOPIC]/3 :: FAILED to commit offset 4380970705 to Zookeeper. Last committed offset: 4380970443
2017-11-11 00:14:33 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4397716347 for [KAFKA_TOPIC]/2!
2017-11-11 00:14:33 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 38214590710 for [KAFKA_TOPIC]/0!
2017-11-11 00:14:33 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] [KAFKA_TOPIC]/0 :: FAILED to commit offset 38214590710 to Zookeeper. Last committed offset: 38214590466
2017-11-11 00:14:33 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4394485987 for [KAFKA_TOPIC]/4!
2017-11-11 00:14:33 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] [KAFKA_TOPIC]/4 :: FAILED to commit offset 4394485987 to Zookeeper. Last committed offset: 4394485910
2017-11-11 00:14:33 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 38266406728 for [KAFKA_TOPIC]/1!
2017-11-11 00:14:33 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] [KAFKA_TOPIC]/1 :: FAILED to commit offset 38266406728 to Zookeeper. Last committed offset: 38266406473
2017-11-11 00:14:37 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 38266406728 for [KAFKA_TOPIC]/1!
2017-11-11 00:14:37 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4397716347 for [KAFKA_TOPIC]/2!
2017-11-11 00:14:37 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] [KAFKA_TOPIC]/2 :: FAILED to commit offset 4397716347 to Zookeeper. Last committed offset: 4397716273
2017-11-11 00:14:41 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 38214590710 for [KAFKA_TOPIC]/0!
2017-11-11 00:14:45 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4380970705 for [KAFKA_TOPIC]/3!
2017-11-11 00:14:49 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4394485987 for [KAFKA_TOPIC]/4!
2017-11-11 00:14:53 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4397716347 for [KAFKA_TOPIC]/2!
2017-11-11 00:14:57 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 38266406728 for [KAFKA_TOPIC]/1!
2017-11-11 00:15:01 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 38214590710 for [KAFKA_TOPIC]/0!
2017-11-11 00:15:02 UTC | client/metadata got error from broker while fetching metadata: read tcp 10.128.8.79:51248->10.1.3.196:9092: i/o timeout
2017-11-11 00:15:02 UTC | Closed connection to broker kafka4:9092
2017-11-11 00:15:02 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka3:9092
2017-11-11 00:15:02 UTC | client/metadata got error from broker while fetching metadata: read tcp 10.128.8.79:51248->10.1.3.196:9092: i/o timeout
2017-11-11 00:15:02 UTC | client/brokers deregistered broker #-1 at kafka4:9092
2017-11-11 00:15:02 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka3:9092
2017-11-11 00:15:02 UTC | client/metadata got error from broker while fetching metadata: read tcp 10.128.8.79:51248->10.1.3.196:9092: i/o timeout
2017-11-11 00:15:02 UTC | client/brokers deregistered broker #-1 at kafka4:9092
2017-11-11 00:15:02 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka3:9092
2017-11-11 00:15:02 UTC | client/metadata got error from broker while fetching metadata: read tcp 10.128.8.79:51248->10.1.3.196:9092: i/o timeout
2017-11-11 00:15:02 UTC | client/brokers deregistered broker #-1 at kafka4:9092
2017-11-11 00:15:02 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka3:9092
2017-11-11 00:15:02 UTC | client/metadata got error from broker while fetching metadata: read tcp 10.128.8.79:51248->10.1.3.196:9092: i/o timeout
2017-11-11 00:15:02 UTC | client/brokers deregistered broker #-1 at kafka4:9092
2017-11-11 00:15:02 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka3:9092
2017-11-11 00:15:05 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4380970705 for [KAFKA_TOPIC]/3!
2017-11-11 00:15:09 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4394485987 for [KAFKA_TOPIC]/4!
2017-11-11 00:15:13 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4380970705 for [KAFKA_TOPIC]/3!
2017-11-11 00:15:17 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4394485987 for [KAFKA_TOPIC]/4!
2017-11-11 00:15:21 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4397716347 for [KAFKA_TOPIC]/2!
2017-11-11 00:15:25 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 38266406728 for [KAFKA_TOPIC]/1!
2017-11-11 00:15:29 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 38214590710 for [KAFKA_TOPIC]/0!
2017-11-11 00:15:32 UTC | Failed to connect to broker kafka3:9092: dial tcp 10.1.3.195:9092: i/o timeout
2017-11-11 00:15:32 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.195:9092: i/o timeout
2017-11-11 00:15:32 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka10:9092
2017-11-11 00:15:32 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.195:9092: i/o timeout
2017-11-11 00:15:32 UTC | client/brokers deregistered broker #-1 at kafka3:9092
2017-11-11 00:15:32 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka10:9092
2017-11-11 00:15:32 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.195:9092: i/o timeout
2017-11-11 00:15:32 UTC | client/brokers deregistered broker #-1 at kafka3:9092
2017-11-11 00:15:32 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka10:9092
2017-11-11 00:15:32 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.195:9092: i/o timeout
2017-11-11 00:15:32 UTC | client/brokers deregistered broker #-1 at kafka3:9092
2017-11-11 00:15:32 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka10:9092
2017-11-11 00:15:32 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.195:9092: i/o timeout
2017-11-11 00:15:32 UTC | client/brokers deregistered broker #-1 at kafka3:9092
.
.
. *trying to commit offset and failure to fetch metadata continues...*
.
.
2017-11-11 01:39:02 UTC | Failed to connect to broker kafka7:9092: dial tcp 10.1.3.143:9092: i/o timeout
2017-11-11 01:39:02 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.143:9092: i/o timeout
2017-11-11 01:39:02 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka1:9092
2017-11-11 01:39:02 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.143:9092: i/o timeout
2017-11-11 01:39:02 UTC | client/brokers deregistered broker #-1 at kafka7:9092
2017-11-11 01:39:02 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka1:9092
2017-11-11 01:39:02 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.143:9092: i/o timeout
2017-11-11 01:39:02 UTC | client/brokers deregistered broker #-1 at kafka7:9092
2017-11-11 01:39:02 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka1:9092
2017-11-11 01:39:02 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.143:9092: i/o timeout
2017-11-11 01:39:02 UTC | client/brokers deregistered broker #-1 at kafka7:9092
2017-11-11 01:39:02 UTC | client/metadata fetching metadata for all topics from broker kafka1:9092
2017-11-11 01:39:06 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4397716347 for [KAFKA_TOPIC]/2!
2017-11-11 01:39:10 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 38266406728 for [KAFKA_TOPIC]/1!
2017-11-11 01:39:14 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4394485987 for [KAFKA_TOPIC]/4!
2017-11-11 01:39:18 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4397716347 for [KAFKA_TOPIC]/2!
2017-11-11 01:39:22 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 38266406728 for [KAFKA_TOPIC]/1!
2017-11-11 01:39:26 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 38214590710 for [KAFKA_TOPIC]/0!
2017-11-11 01:39:30 UTC | [[CONSUMER_GROUP_NAME]/2a598e119a01] FAILED to commit offset 4380970705 for [KAFKA_TOPIC]/3!
2017-11-11 01:39:32 UTC | Failed to connect to broker kafka1:9092: dial tcp 10.1.3.235:9092: i/o timeout
2017-11-11 01:39:32 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.235:9092: i/o timeout
2017-11-11 01:39:32 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka2:9092
2017-11-11 01:39:32 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.235:9092: i/o timeout
2017-11-11 01:39:32 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.235:9092: i/o timeout
2017-11-11 01:39:32 UTC | client/brokers deregistered broker #-1 at kafka1:9092
2017-11-11 01:39:32 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka2:9092
2017-11-11 01:39:32 UTC | client/brokers deregistered broker #-1 at kafka1:9092
2017-11-11 01:39:32 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka2:9092
2017-11-11 01:39:32 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.235:9092: i/o timeout
2017-11-11 01:39:32 UTC | client/brokers deregistered broker #-1 at kafka1:9092
2017-11-11 01:39:32 UTC | client/metadata fetching metadata for all topics from broker kafka2:9092
2017-11-11 01:40:02 UTC | Failed to connect to broker kafka2:9092: dial tcp 10.1.3.236:9092: i/o timeout
2017-11-11 01:40:02 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.236:9092: i/o timeout
2017-11-11 01:40:02 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka0:9092
2017-11-11 01:40:02 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.236:9092: i/o timeout
2017-11-11 01:40:02 UTC | client/brokers deregistered broker #-1 at kafka2:9092
2017-11-11 01:40:02 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka0:9092
2017-11-11 01:40:02 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.236:9092: i/o timeout
2017-11-11 01:40:02 UTC | client/brokers deregistered broker #-1 at kafka2:9092
2017-11-11 01:40:02 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka0:9092
2017-11-11 01:40:02 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.236:9092: i/o timeout
2017-11-11 01:40:02 UTC | client/brokers deregistered broker #-1 at kafka2:9092
2017-11-11 01:40:02 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 01:40:32 UTC | Failed to connect to broker kafka0:9092: dial tcp 10.1.3.234:9092: i/o timeout
2017-11-11 01:40:32 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.234:9092: i/o timeout
2017-11-11 01:40:32 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka9:9092
2017-11-11 01:40:32 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.234:9092: i/o timeout
2017-11-11 01:40:32 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.234:9092: i/o timeout
2017-11-11 01:40:32 UTC | client/brokers deregistered broker #-1 at kafka0:9092
2017-11-11 01:40:32 UTC | client/metadata fetching metadata for [[KAFKA_TOPIC]] from broker kafka9:9092
2017-11-11 01:40:32 UTC | client/brokers deregistered broker #-1 at kafka0:9092
2017-11-11 01:40:32 UTC | client/metadata fetching metadata for all topics from broker kafka9:9092
2017-11-11 01:40:32 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.234:9092: i/o timeout
2017-11-11 01:40:32 UTC | client/brokers deregistered broker #-1 at kafka0:9092
.
.
. *no failure messages to commit offset anymore. but the brokers are still away*
.
.
2017-11-11 04:55:08 UTC | client/metadata retrying after 250ms... (2 attempts remaining)
2017-11-11 04:55:09 UTC | client/metadata fetching metadata for all topics from broker kafka4:9092
2017-11-11 04:55:39 UTC | Failed to connect to broker kafka4:9092: dial tcp 10.1.3.196:9092: i/o timeout
2017-11-11 04:55:39 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.196:9092: i/o timeout
2017-11-11 04:55:39 UTC | client/metadata fetching metadata for all topics from broker kafka3:9092
2017-11-11 04:56:09 UTC | Failed to connect to broker kafka3:9092: dial tcp 10.1.3.195:9092: i/o timeout
2017-11-11 04:56:09 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.195:9092: i/o timeout
2017-11-11 04:56:09 UTC | client/metadata fetching metadata for all topics from broker kafka10:9092
2017-11-11 04:56:39 UTC | Failed to connect to broker kafka10:9092: dial tcp 10.1.3.146:9092: i/o timeout
2017-11-11 04:56:39 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.146:9092: i/o timeout
2017-11-11 04:56:39 UTC | client/metadata fetching metadata for all topics from broker kafka7:9092
2017-11-11 04:57:09 UTC | Failed to connect to broker kafka7:9092: dial tcp 10.1.3.143:9092: i/o timeout
2017-11-11 04:57:09 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.143:9092: i/o timeout
2017-11-11 04:57:09 UTC | client/metadata fetching metadata for all topics from broker kafka1:9092
2017-11-11 04:57:39 UTC | Failed to connect to broker kafka1:9092: dial tcp 10.1.3.235:9092: i/o timeout
2017-11-11 04:57:39 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.235:9092: i/o timeout
2017-11-11 04:57:39 UTC | client/metadata fetching metadata for all topics from broker kafka2:9092
2017-11-11 04:58:09 UTC | Failed to connect to broker kafka2:9092: dial tcp 10.1.3.236:9092: i/o timeout
2017-11-11 04:58:09 UTC | client/metadata got error from broker while fetching metadata: dial tcp 10.1.3.236:9092: i/o timeout
2017-11-11 04:58:09 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 04:58:09 UTC | Connected to broker at kafka0:9092 (unregistered)
2017-11-11 04:58:09 UTC | client/brokers registered new broker #0 at kafka0:9092
2017-11-11 04:58:09 UTC | client/brokers registered new broker #10 at kafka10:9092
2017-11-11 04:58:09 UTC | client/brokers registered new broker #1 at kafka1:9092
2017-11-11 04:58:09 UTC | client/brokers registered new broker #9 at kafka9:9092
2017-11-11 04:58:09 UTC | client/brokers registered new broker #2 at kafka2:9092
2017-11-11 04:58:09 UTC | client/brokers registered new broker #7 at kafka7:9092
2017-11-11 04:58:09 UTC | client/brokers registered new broker #3 at kafka3:9092
2017-11-11 04:58:09 UTC | client/brokers registered new broker #8 at kafka8:9092
2017-11-11 04:58:09 UTC | client/brokers registered new broker #4 at kafka4:9092
2017-11-11 04:58:09 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 05:02:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 05:12:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 05:22:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 05:32:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 05:42:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 05:52:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 06:02:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 06:12:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 06:22:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 06:32:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 06:42:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 06:52:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 07:02:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 07:12:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 07:22:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 07:32:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 07:42:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 07:52:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 08:02:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 08:12:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 08:22:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 08:32:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
2017-11-11 08:42:29 UTC | client/metadata fetching metadata for all topics from broker kafka0:9092
After registering brokers, we were still not consuming messages. Also, after a while even though the network problem persists, saving the offset is not tried anymore and there are no error messages. Which is a bit strange to me.
And when the brokers are registered, there is no partition rebalancing or anything like that in the logs.
The way I have implemented is:
func (cons *KafkaConsumer) restartGroupWithZooKeeper() {
time.Sleep(cons.persistTimeout)
cons.readFromGroupWithZookeeper()
}
func (cons *KafkaConsumer) readFromGroupWithZookeeper() {
cons.AddWorker()
restartWithNewClient := false
defer func() {
// close the existing group client otherwise
// there will be a stale client who had partition but is not reading.
if (cons.consumerGroupClient != nil) && (!cons.consumerGroupClient.Closed()) {
cons.consumerGroupClient.Close()
}
cons.WorkerDone()
if restartWithNewClient {
// mutual recursive function. Will call this function again.
// so it is important to close the existing one above
cons.restartGroupWithZooKeeper()
}
}()
client, err := kzconsumergroup.JoinConsumerGroup(cons.group, []string{cons.topic}, cons.zookeeper, cons.consumerGroupConfig)
cons.consumerGroupClient = client
if err != nil {
restartWithNewClient = true
Log.Error.Printf("Restarting kafka consumer %s:%s - %s", cons.topic, cons.group, err.Error())
return
}
spin := shared.NewSpinner(shared.SpinPriorityLow)
for !cons.consumerGroupClient.Closed() {
cons.WaitOnFuse()
select {
case event := <-cons.consumerGroupClient.Messages():
cons.processMessage(event)
cons.consumerGroupClient.CommitUpto(event)
case err := <-cons.consumerGroupClient.Errors():
restartWithNewClient = true
Log.Error.Print("Kafka consumer error: ", err)
return
default:
spin.Yield()
}
}
}
I saw that there are multiple issues related to Sarama as well when it comes to connection issues. So, I wanted to post it here to get a bit of confirmation that this consumer is good before creating issue in sarama directly.
@wvanbergen if you have any ideas on this, I would be happy to create a PR to resolve this.
This way we can automatically start consuming new partitions when they get added.
I am planning to upgrade sarama clients from 0.8 to latest version and use sarama-cluster. I see that offset will be committed to kafka now.
On Apache Kafka page it says that for doing migration from zk based storage to kafka you need to do following:
Set offsets.storage=kafka and dual.commit.enabled=true in your consumer config.
I cannot find these properties anywhere? Does anyone know if we need to set it and where?
can i use this package if my offset is stored in kafka not zk ?
It would be nice if zookeeperOffsetManager.commitOffsets()
got called from ConsumerGroup.Close()
.
Right now stuff that is 'markedForCommit' after the last commit and before CG close is dropped.
Maybe also provide a Commit()
method in the OffsetManager
interface to be able to force commits from CG/clients.
I have 1 consumer and what might be a finicky zookeeper connection which can cause a panic with a divide by zero.
Log output (AFAIK, zookeeper is up, else I'd have bigger problems, however this client is behind a VPN):
2015/07/03 22:32:29 Failed to connect to zoo1:2181: dial tcp 10.129.196.49:2181: i/o timeout
2015/07/03 22:32:31 Failed to connect to zoo2:2181: dial tcp 10.129.196.11:2181: i/o timeout
2015/07/03 22:32:32 read tcp 10.129.196.49:2181: i/o timeout
2015/07/03 22:32:33 Failed to connect to zoo2:2181: dial tcp 10.129.196.11:2181: i/o timeout
panic: runtime error: integer divide by zero
[signal 0x8 code=0x7 addr=0x36bafd pc=0x36bafd]
goroutine 544 [running]:
github.com/wvanbergen/kafka/consumergroup.dividePartitionsBetweenConsumers(0x1005fc8, 0x0, 0x0, 0xc208dd2200, 0x20, 0x20, 0x0)
/Users/nimi/go/src/github.com/wvanbergen/kafka/consumergroup/utils.go:39 +0x16d
github.com/wvanbergen/kafka/consumergroup.(*ConsumerGroup).topicConsumer(0xc2081a8000, 0xafae10, 0xa, 0xc208162f60, 0xc208163320, 0xc208dfdce0)
/Users/nimi/go/src/github.com/wvanbergen/kafka/consumergroup/consumer_group.go:310 +0x7c3
created by github.com/wvanbergen/kafka/consumergroup.(*ConsumerGroup).topicListConsumer
/Users/nimi/go/src/github.com/wvanbergen/kafka/consumergroup/consumer_group.go:260 +0x3f6
Which points to here
sort.Sort(partitions)
sort.Sort(consumers)
n := plen / clen
if plen%clen > 0 {
n++
}
I'm assuming (haven't done any digging) that we see no consumers (because zookeeper madness?) and try to divide partitions among zero consumers. Given the ZooKeeper connection should eventually comeback, what should be the right course of action here? I think it would be ideal for the consumer to do no work until the connection with ZK stabilizes/comes back.
When the same identifier for Kafka group is used to consume different topics, sometimes the wvanbergen client receives 0 messages.
As a workaround, users can explicitly prefix each group name with <topic>-
, but ideally this would be fixed by the client library itself.
how to disable rebalance?
Hello,
I have observe when I use consumer group library, it works fine for short time after that it does not put offset in the zookeeper. After sometime, I only see the following offsets.
/consumers/testgroup/offsets/mmetopic1/2:2502052
/consumers/testgroup/offsets/mmetopic1/1:2417072
/consumers/testgroup/offsets/mmetopic1/0:2482172
My producer keep writing new messages however consumer entries are still the same. When I try to instantiate another consumer, I get the following message (which might be useful for you to debug).
[Sarama] 2014/08/30 04:48:37 Initializing new client
[Sarama] 2014/08/30 04:48:37 Fetching metadata from broker 10.1.128.1:9092
[Sarama] 2014/08/30 04:48:37 Connected to broker 10.1.128.1:9092
[Sarama] 2014/08/30 04:48:37 Registered new broker #1 at 10.1.128.1:9092
[Sarama] 2014/08/30 04:48:37 Registered new broker #3 at 10.1.130.1:9092
[Sarama] 2014/08/30 04:48:37 Registered new broker #2 at 10.1.129.1:9092
[Sarama] 2014/08/30 04:48:37 Successfully initialized new client
[Sarama] 2014/08/30 04:48:37 Connected to broker 10.1.128.1:9092
[Sarama] 2014/08/30 04:48:37 Connected to broker 10.1.129.1:9092
[Sarama] 2014/08/30 04:48:37 Connected to broker 10.1.130.1:9092
[Sarama] 2014/08/30 04:48:37 Initializing new client
[Sarama] 2014/08/30 04:48:37 Fetching metadata from broker 10.1.130.1:9092
[Sarama] 2014/08/30 04:48:37 Connected to broker 10.1.130.1:9092
[Sarama] 2014/08/30 04:48:37 Registered new broker #1 at 10.1.128.1:9092
[Sarama] 2014/08/30 04:48:37 Registered new broker #3 at 10.1.130.1:9092
[Sarama] 2014/08/30 04:48:37 Registered new broker #2 at 10.1.129.1:9092
[Sarama] 2014/08/30 04:48:37 Successfully initialized new client
[Sarama] 2014/08/30 04:48:37 Connected to broker 10.1.129.1:9092
[Sarama] 2014/08/30 04:48:37 Connected to broker 10.1.128.1:9092
[Sarama] 2014/08/30 04:48:37 Connected to broker 10.1.130.1:9092
[Sarama] 2014/08/30 04:48:37 [testgroup/d238b36d178a] Consumer instance registered (tpalkcoe3b9:66a33176-668a-4c89-b314-d238b36d178a).
init succeesfull
[Sarama] 2014/08/30 04:48:37 [testgroup/d238b36d178a] Currently registered consumers: 5
[Sarama] 2014/08/30 04:48:37 [testgroup/d238b36d178a] Started topic consumer for mmetopic1
[Sarama] 2014/08/30 04:48:37 [testgroup/d238b36d178a] Claiming 1 of 3 partitions for topic mmetopic1.
[Sarama] 2014/08/30 04:48:37 [testgroup/d238b36d178a] Started partition consumer for mmetopic1:2 at offset 2502052.
[Sarama] 2014/08/30 04:48:38 [testgroup/d238b36d178a] Stopping partition consumer for mmetopic1:2 at offset 0.
[Sarama] 2014/08/30 04:48:38 [testgroup/d238b36d178a] Stopped topic consumer for mmetopic1
eg:cg, _ := consumergroup.JoinConsumerGroup("convert", kafkaTopics, Conf.Kafka.Consumer.ZKAddrs, config)
in the same partision
msg.offset is 187 but i did't exec
cg.CommitUpto(msg) cg.FlushOffsets()
and msg.offet is 188 exec
cg.CommitUpto(msg) cg.FlushOffsets()
and msg.offet is 189 didn't exec
cg.CommitUpto(msg) cg.FlushOffsets()
and then i restart my consumergroup . cg.Messages() will retrive 189,but 187 will not be retrived. i 'm consufed by this
Hi,
When the internet connection is off for sometime and up again then the consumers do not consume incoming messages. The caller isn't notified of this and no action can be taken.
You can reproduce this behavior the following way:
Here's my log right after I turned on my wifi:
2016-05-31T23:42:22Z Unstructured Log Line,file=structs.go:21,text= Failed to connect to 10.101.206.42:2181: dial tcp 10.101.206.42:2181: connect: network is unreachable
2016-05-31T23:42:23Z Unstructured Log Line,text= Connected to 10.102.206.12:2181,file=structs.go:21
2016-05-31T23:42:23Z Unstructured Log Line,file=structs.go:21,text= Authentication failed: zk: session has been expired by the server
2016-05-31T23:42:23Z [My-Group/80b66df0c462] Triggering rebalance due to consumer list change
2016-05-31T23:42:23Z [My-Group/80b66df0c462] mytopic/0 :: Stopping partition consumer at offset -1
2016-05-31T23:42:23Z [My-Group/80b66df0c462] mytopic/1 :: Stopping partition consumer at offset -1
2016-05-31T23:42:23Z [My-Group/80b66df0c462] mytopic/2 :: Stopping partition consumer at offset -1
2016-05-31T23:42:23Z consumer/broker/3 closed dead subscription to mytopic/0
2016-05-31T23:42:23Z consumer/broker/2 closed dead subscription to mytopic/2
2016-05-31T23:42:24Z consumer/broker/1 closed dead subscription to mytopic/1
2016-05-31T23:42:24Z [My-Group/80b66df0c462] mytopic :: Stopped topic consumer
2016-05-31T23:42:24Z [My-Group/80b66df0c462] Currently registered consumers: 0
2016-05-31T23:42:24Z [My-Group/80b66df0c462] mytopic :: Started topic consumer
2016-05-31T23:42:24Z [My-Group/80b66df0c462] mytopic :: Claiming 0 of 3 partitions
2016-05-31T23:42:24Z [My-Group/80b66df0c462] mytopic :: Stopped topic consumer
The znode at /someroot/consumers/My-Group/ids is cleared after the connection is lost. From Kafka protocol doc:
The coordinator considers the consumer dead if it receives no heartbeat after this timeout in ms.
When the group got the connection back then there're no registered consumers. It stops but does not report this to the caller.
Sending an error in the errors
channel would be a possible fix. It would notify the caller who could shut down the group and restart it.
Any thoughts?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.