Giter Club home page Giter Club logo

kafka-pixy's Introduction

Kafka-Pixy (gRPC/REST Proxy for Kafka)

Build Status Go Report Card Coverage Status Docker Pulls

Kafka-Pixy is a dual API (gRPC and REST) proxy for Kafka with automatic consumer group control. It is designed to hide the complexity of the Kafka client protocol and provide a stupid simple API that is trivial to implement in any language.

Kafka-Pixy is tested against Kafka versions 1.1.1 and 2.3.0, but is likely to work with any version starting from 0.8.2.2. It uses the Kafka Offset Commit/Fetch API to keep track of consumer offsets. However Group Membership API is not yet implemented, therefore it needs to talk to Zookeeper directly to manage consumer group membership.

Warning: Kafka-Pixy does not support wildcard subscriptions and therefore cannot coexist in a consumer group with clients using them. It should be possible to use other clients in the same consumer group as kafka-pixy instance if they subscribe to topics by their full names, but that has never been tested so do that at your own risk.

If you are anxious to get started then install Kafka-Pixy and proceed with a quick start guide for your weapon of choice: Curl, Python, or Golang. If you want to use some other language, then you still can use either of the guides for inspiration, but you would need to generate gRPC client stubs from kafkapixy.proto yourself (please refer to gRPC documentation for details).

Key Features:

  • Automatic Consumer Group Management: Unlike in Kafka REST Proxy by Confluent clients do not need to explicitly create a consumer instance. When Kafka-Pixy gets a consume request for a group-topic pair for the first time, it automatically joins the group and subscribes to the topic. When requests stop coming for longer than the subscription timeout it cancels the subscription;
  • At Least Once Guarantee: The main feature of Kafka-Pixy is that it guarantees at-least-once message delivery. The guarantee is achieved via combination of synchronous production and explicit acknowledgement of consumed messages;
  • Dual API: Kafka-Pixy provides two types of API:
    • gRPC (Protocol Buffers over HTTP/2) recommended to produce/consume messages;
    • REST (JSON over HTTP) intended for testing and operations purposes, although you can use it to produce/consume messages too;
  • Multi-Cluster Support: One Kafka-Pixy instance can proxy to several Kafka clusters. You just need to define them in the config file and then address clusters by name given in the config file in your API requests.
  • Aggregation: Kafka works best when messages are read/written in batches, but from an application's standpoint it is easier to deal with individual message read/writes. Kafka-Pixy provides a message based API to clients, but internally it aggregates requests and sends them to Kafka in batches.
  • Locality: Kafka-Pixy is intended to run on the same host as the applications using it. Remember that it only provides a message based API - no batching, therefore using it over network is suboptimal.

gRPC API

gRPC is an open source framework that is using Protocol Buffers as interface definition language and HTTP/2 as transport protocol. Kafka-Pixy API is defined in kafkapixy.proto. Client stubs for Golang and Python are generated and provided in this repository, but you can easily generate stubs for a bunch of other languages. Please refer to the gRPC documentation for information on the language of your choice.

REST API

It is highly recommended to use the gRPC API for production/consumption. The HTTP API is only provided for quick tests and operational purposes.

Each API endpoint has two variants which differ by /clusters/<cluster> prefix. The one with the proxy prefix is to be used when multiple clusters are configured. The one without the prefix operates on the default cluster (the one that is mentioned first in the YAML configuration file).

Produce

POST /topics/<topic>/messages
POST /clusters/<cluster>/topics/<topic>/messages

Writes a message to a topic on a particular cluster. If the request content type is either text/plain or application/json then a message should be sent as the body of a request. If content type is x-www-form-urlencoded then a message should be passed as the msg form parameter.

If Kafka-Pixy is configured to use a version of the Kafka protocol (via the kafka.version proxy setting) that is 0.11.0.0 or later, it is also possible to add record headers to a message by adding HTTP headers to your message. Any HTTP header with the prefix "X-Kafka-" will have that prefix stripped and the header will be used as a record header. Since the values of Kafka headers can be arbitrary byte strings, the value of the HTTP header must be Base 64-encoded.

Parameter Opt Description
cluster yes The name of a cluster to operate on. By default the cluster mentioned first in the proxies section of the config file is used.
topic The name of a topic to produce to
key yes A string whose hash is used to determine a partition to produce to. By default a random partition is selected.
msg * Used only if the request content type is x-www-form-urlencoded. In other cases the request body is the message.
sync yes A flag (value is ignored) that makes Kafka-Pixy wait for all ISR to confirm write before sending a response back. By default a response is sent immediatelly after the request is received.

By default the message is written to Kafka asynchronously, that is the HTTP request completes as soon as Kafka-Pixy reads the request from the wire, and production to Kafka is performed in the background. Therefore it is not guaranteed that the message will ever get into Kafka.

If you need a guarantee that a message is written to Kafka, then pass the sync flag with your request. In that case when Kafka-Pixy returns a response is governed by producer.required_acks parameter in the YAML config. It can be one of:

  • no_response: the response is returned as soon as a produce request is delivered to a partition leader Kafka broker (no disk writes performed yet).
  • wait_for_local: the response is returned as soon as data is written to the disk by a partition leader Kafka broker.
  • wait_for_all: the response is returned after all in-sync replicas have data committed to disk.

E.g. if a Kafka-Pixy process has been started with the --tcpAddr=0.0.0.0:8080 argument, then you can test it using curl as follows:

curl -X POST localhost:8080/topics/foo/messages?key=bar&sync \
  -H 'Content-Type: text/plain' \
  -d 'Good news everyone!'

If the message is submitted asynchronously then the response will be an empty json object {}.

If the message is submitted synchronously then in case of success (HTTP status 200) the response will be like:

{
  "partition": <partition number>,
  "offset": <message offset>
}

In case of failure (HTTP statuses 404 and 500) the response will be:

{
  "error": <human readable explanation>
}

Consume

GET /topics/<topic>/messages
GET /clusters/<cluster>/topics/<topic>/messages

Consumes a message from a topic of a particular cluster as a member of a particular consumer group. A message previously consumed from the same topic can be optionally acknowledged.

Parameter Opt Description
cluster yes The name of a cluster to operate on. By default the cluster mentioned first in the proxies section of the config file is used.
topic The name of a topic to consume from.
group The name of a consumer group.
noAck yes A flag (value is ignored) that no message should be acknowledged. For default behaviour read below.
ackPartition yes A partition number that the acknowledged message was consumed from. For default behaviour read below.
ackOffset yes An offset of the acknowledged message. For default behaviour read below.

If noAck is defined in a request then no message is acknowledged by the request. If a request defines both ackPartition and ackOffset parameters then a message previously consumed from the same topic from the specified partition with the specified offset is acknowledged by the request. If none of the ack related parameters is specified then the request will acknowledge the message consumed in this requests if any. It is called auto-ack mode.

When a message is consumed as a member of a consume group for the first time, Kafka-Pixy joins the consumer group and subscribes to the topic. All Kafka-Pixy instances that are currently members of that group and subscribed to that topic distribute partitions between themselves, so that each Kafka-Pixy instance gets a subset of partitions for exclusive consumption (Read more about Kafka consumer groups here).

If a Kafka-Pixy instance has not received consume requests for a topic for the duration of the subscription timeout, then it unsubscribes from the topic, and the topic partitions are redistributed among Kafka-Pixy instances that are still consuming from it.

If there are no unread messages in the topic the request will block waiting for the duration of the long polling timeout. If there are no messages produced during this long poll waiting then the request will return 408 Request Timeout error, otherwise the response will be a JSON document of the following structure:

{
  "key": <base64 encoded key>,
  "value": <base64 encoded message body>,
  "partition": <partition number>,
  "offset": <message offset>,
  "headers": [
    {
      "key": <string header key>,
      "value": <base64-encoded header value>
    }
  ]
}

e.g.:

{
  "key": "0JzQsNGA0YPRgdGP",
  "value": "0JzQvtGPINC70Y7QsdC40LzQsNGPINC00L7Rh9C10L3RjNC60LA=",
  "partition": 0,
  "offset": 13,
  "headers": [
    {
      "key": "foo",
      "value": "YmFy"
    }
  ]
}

Note that headers are only supported if the Kafka protocol version (set via the kafka.version configuration flag) is set to 0.11.0.0 or later.

Acknowledge

POST /topics/<topic>/acks
POST /clusters/<cluster>/topics/<topic>/acks

Acknowledges a previously consumed message.

Parameter Opt Description
cluster yes The name of a cluster to operate on. By default the cluster mentioned first in the proxies section of the config file is used.
topic The name of a topic to produce to.
group The name of a consumer group.
partition A partition number that the acknowledged message was consumed from.
offset An offset of the acknowledged message.

Get Offsets

GET /topics/<topic>/offsets
GET /clusters/<cluster>/topics/<topic>/offsets

Returns offset information for all partitions of the specified topic including the next offset to be consumed by the specified consumer group. The structure of the returned JSON document is as follows:

Parameter Opt Description
cluster yes The name of a cluster to operate on. By default the cluster mentioned first in the proxies section of the config file is used.
topic The name of a topic to produce to.
group The name of a consumer group.
[
  {
    "partition": <partition id>,
    "begin": <oldest offset>,
    "end": <newest offset>,
    "count": <the number of messages in the topic, equals to `end` - `begin`>,
    "offset": <next offset to be consumed by this consumer group>,
    "lag": <equals to `end` - `offset`>,
    "metadata": <arbitrary string committed with the offset, not used by Kafka-Pixy. It is omitted if empty>
  },
  ...
]

Set Offsets

POST /topics/<topic>/offsets
POST /clusters/<cluster>/topics/<topic>/offsets

Sets offsets to be consumed from the specified topic by a particular consumer group. The request content should be a list of JSON objects, where each object defines an offset to be set for a particular partition:

Parameter Opt Description
cluster yes The name of a cluster to operate on. By default the cluster mentioned first in the proxies section of the config file is used.
topic The name of a topic to produce to.
group The name of a consumer group.
[
  {
    "partition": <partition id>,
    "offset": <next offset to be consumed by this consumer group>,
    "metadata": <arbitrary string>
  },
  ...
]

Note that consumption by all consumer group members should cease before this call can be executed. That is necessary because while consuming, Kafka-Pixy constantly updates partition offsets, and it does not expect them to be updated by somebody else. So it only reads them on group initialization, that happens when a consumer group request comes after 20 seconds or more of the consumer group inactivity on all Kafka-Pixy instances working with the Kafka cluster.

List Consumers

GET /topics/<topic>/consumers
GET /clusters/<cluster>/topics/<topic>/consumers

Returns a list of consumers that are subscribed to a topic.

Parameter Opt Description
cluster yes The name of a cluster to operate on. By default the cluster mentioned first in the proxies section of the config file is used.
topic The name of a topic to produce to.
group yes The name of a consumer group. By default returns data for all known consumer groups subscribed to the topic.

e.g.:

curl -G localhost:19092/topic/some_queue/consumers

yields:

{
  "integrations": {
    "pixy_jobs1_62065_2015-09-24T22:21:05Z": [0,1,2,3],
    "pixy_jobs2_18075_2015-09-24T22:21:28Z": [4,5,6],
    "pixy_jobs3_336_2015-09-24T22:21:51Z": [7,8,9]
  },
  "logstash-customer": {
    "logstash-customer_logs01-1443116116450-7f54d246-0": [0,1,2],
    "logstash-customer_logs01-1443116116450-7f54d246-1": [3,4,5],
    "logstash-customer_logs01-1443116116450-7f54d246-2": [6,7],
    "logstash-customer_logs01-1443116116450-7f54d246-3": [8,9]
  },
  "logstash-reputation4": {
    "logstash-reputation4_logs16-1443178335419-c08d8ab6-0": [0],
    "logstash-reputation4_logs16-1443178335419-c08d8ab6-1": [1],
    "logstash-reputation4_logs16-1443178335419-c08d8ab6-10": [2],
    "logstash-reputation4_logs16-1443178335419-c08d8ab6-11": [3],
    "logstash-reputation4_logs16-1443178335419-c08d8ab6-12": [4],
    "logstash-reputation4_logs16-1443178335419-c08d8ab6-13": [5],
    "logstash-reputation4_logs16-1443178335419-c08d8ab6-14": [6],
    "logstash-reputation4_logs16-1443178335419-c08d8ab6-15": [7],
    "logstash-reputation4_logs16-1443178335419-c08d8ab6-2": [8],
    "logstash-reputation4_logs16-1443178335419-c08d8ab6-3": [9]
  },
  "test": {
    "pixy_core1_47288_2015-09-24T22:15:36Z": [0,1,2,3,4],
    "pixy_in7_102745_2015-09-24T22:24:14Z": [5,6,7,8,9]
  }
}

List Topics

GET /topics
GET /clusters/<cluster>/topics

Returns a list of topics optionally with detailed configuration and partitions.

Parameter Opt Description
cluster yes The name of a cluster to operate on. By default the cluster mentioned first in the proxies section of the config file is used.
withPartitions yes Whether a list of partitions should be returned for every topic.
withConfig yes Whether configuration should be returned for every topic.

Get Topic Config

GET /topics/<topic>
GET /clusters/<cluster>/topics/<topic>

Returns topic configuration optionally with a list of partitions.

Parameter Opt Description
cluster yes The name of a cluster to operate on. By default the cluster mentioned first in the proxies section of the config file is used.
withPartitions yes Whether a list of partitions should be returned.

Configuration

Kafka-Pixy is designed to be very simple to run. It consists of a single executable that can be started just by passing a bunch of command line parameters to it - no configuration file needed.

However if you do need to fine-tune Kafka-Pixy for your use case, you can provide a YAML configuration file. Default configuration file default.yaml is shipped in the release archive. In your configuration file you can specify only parameters that you want to change, other options take their default values. If some option is both specified in the configuration file and provided as a command line argument, then the command line argument wins.

Command line parameters that Kafka-Pixy accepts are listed below:

Parameter Description
config Path to a YAML configuration file.
kafkaPeers Comma separated list of Kafka brokers. Note that these are just seed brokers. The other brokers are discovered automatically. (Default localhost:9092)
zookeeperPeers Comma separated list of ZooKeeper nodes followed by optional chroot. (Default localhost:2181)
grpcAddr TCP address that the gRPC API should listen on. (Default 0.0.0.0:19091)
tcpAddr TCP address that the HTTP API should listen on. (Default 0.0.0.0:19092)
unixAddr Unix Domain Socket that the HTTP API should listen on. If not specified then the service will not listen on a Unix Domain Socket.
pidFile Name of a pid file to create. If not specified then a pid file is not created.

You can run kafka-pixy -help to make it list all available command line parameters.

Security

SSL/TLS can be configured on both the gRPC and HTTP servers by specifying a certificate and key file in the configuration. Both files must be specified in order to run with security enabled.

If configured, both the gRPC and HTTP servers will run with TLS enabled.

Additionally TLS may be configured for the Kafka cluster by enabling tls in the kafka section of the configuration YAML (along with any required certificates). Details can be found in the default YAML file (default.yaml).

License

Kafka-Pixy is under the Apache 2.0 license. See the LICENSE file for details.

kafka-pixy's People

Contributors

ahamidi avatar bbaugher avatar bippityboppity avatar codelingobot avatar evan-stripe avatar hermanschaaf avatar horkhe avatar korservick avatar lennon-stripe avatar maxthomas avatar mitchelnijdam-rockstars avatar slava-mg avatar solsson avatar the-alchemist avatar thrawn01 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka-pixy's Issues

First time consume from a group may skip messages

Consider the following sequence of events:

  1. GET /topics/some_topic/messages?group=some_group is called for the very first time. Since there have been no offsets committed to Kafka for some_group consumer group, the default offset is -1 which means the next message that is produced. So the operation blocks on the long poll waiting for a message to be produced.
  2. No new message have been produced during the long poll period (3 seconds) so the operation times out and returns 408 Request Timeout.
  3. A message is produced (e.g. POST /topics/some_topic/message -H 'Content-Type: text/plain -d "foo bar"
  4. GET /topics/some_topic/messages?group=some_group is called one more time, but rather then returning the "foo bar" message, it blocks and times out after 3 seconds.

The reason for this behaviour is that if the very first consume method times out it does not initialize the Kafka offset storage with the most resent offset, so when the consume is called next time it yet again uses -1 for the initial commit.

ClientID is invalid

When I try to start pixy the following error occurred with ClientID
I use Kafka 2.10-0.9.0.2

[user@m250hdp1 kafka-pixy-v0.11.1-linux-amd64]$ ./kafka-pixy --kafkaPeers "localhost:6667" --zookeeperPeers "localhost:2181" --logging "[{\"name\": \"console\", \"severity\": \"debug\"}]"
Aug 16 18:37:02.205 kafka-pixy INFO PID:20898 [main.go:81:main.main] Starting with config: &{UnixAddr: TCPAddr:0.0.0.0:19092 ClientID:pixy_m250hdp1.inside.corp.int_20898_2016-08-16T18:37:02Z Kafka:{SeedPeers:[localhost:6667]} ZooKeeper:{SeedPeers:[localhost:2181] Chroot:} Producer:{ChannelBufferSize:4096 ShutdownTimeout:30s DeadMessageCh:<nil>} Consumer:{ChannelBufferSize:64 LongPollingTimeout:3s RegistrationTimeout:20s BackOffTimeout:500ms RebalanceDelay:250ms OffsetsCommitInterval:500ms ReturnErrors:false}}
...
Aug 16 18:37:02.207 kafka-pixy INFO PID:20898 [client.go:107:github.com/Shopify/sarama.NewClient] [sarama] Initializing new client
Aug 16 18:37:02.207 kafka-pixy ERROR PID:20898 [main.go:84:main.main] Failed to start service: err=(failed to spawn consumer, err=(failed to create Kafka client for message streams: err=(kafka: invalid configuration (ClientID is invalid))))

Consuming from an empty topic affects other consumers throughput

Consumption from a queue that does not have a steady influx of new events is being performed alongside with consumption from a busy queue, the consumption rate of the busy queue drops down significantly (twice in my testing). Note that both consumptions should be coming from the same group.

Producer should have capacity enough to survive up to 1 minute leader elections

It was observed in production that Kafka leader election took more than 30 seconds upon a node death. However Kafka-Pixy only retries productions for 15 seconds. As a result a bunch of asynchronously produced messages was lost. To avoid such losses in the future the producer should keep asynchronously produced messages for about 1 minute and have an internal buffer capacity to keep a minute worth load of messages. When #16 is implemented users will be able to tweak respective parameters to meet their specific needs.

Group consumer closed while rebalancing in progress

Turned out the group consumer can stop leaving a pending rebalancing goroutine behind. That in turn was causing occasional panics like the one below:

May  7 00:45:22.164 INFO  </T[2]/cons[0]/dispatcher[0]> started
May  7 00:45:22.164 INFO  </T[2]/cons[0]/G:g1[0]> started
May  7 00:45:22.164 INFO  </T[2]/cons[0]/G:g1[0]/dispatcher[0]> started
May  7 00:45:22.164 INFO  </T[2]/cons[0]/offset_mgr_f[0]/mapper[0]> started
May  7 00:45:22.164 INFO  </T[2]/cons[0]/G:g1[0]/T:no-such-topic[0]> started
May  7 00:45:22.164 INFO  </T[2]/cons[0]/G:g1[0]/member[0]> started
May  7 00:45:22.164 INFO  </T[2]/cons[0]/G:g1[0]/manager[0]> started
May  7 00:45:22.164 INFO  </T[2]/cons[0]/G:g1[0]/msg_stream_f[0]/mapper[0]> started
May  7 00:45:22.164 INFO  [zk] Connected to 192.168.100.67:2181
May  7 00:45:22.168 INFO  [zk] Authenticated: id=95833896904687742, timeout=15000
May  7 00:45:22.180 INFO  </T[2]/cons[0]/G:g1[0]/member[0]> submitted: topics=[no-such-topic]
May  7 00:45:22.286 INFO  </T[2]/cons[0]/G:g1[0]/member[0]> fetched subscriptions: map[consumer-1:[no-such-topic]]
May  7 00:45:22.286 INFO  </T[2]/cons[0]/G:g1[0]/manager[0]/rebalancer[0]> started
May  7 00:45:22.286 INFO  [sarama] client/metadata fetching metadata for [no-such-topic] from broker 192.168.100.67:9093
May  7 00:45:22.287 INFO  [sarama] client/metadata found some partitions to be leaderless
May  7 00:45:22.287 INFO  [sarama] client/metadata retrying after 250ms... (3 attempts remaining)
May  7 00:45:22.539 INFO  [sarama] client/metadata fetching metadata for [no-such-topic] from broker 192.168.100.67:9093
May  7 00:45:22.540 INFO  [sarama] client/metadata found some partitions to be leaderless
May  7 00:45:22.540 INFO  [sarama] client/metadata retrying after 250ms... (2 attempts remaining)
May  7 00:45:22.794 INFO  [sarama] client/metadata fetching metadata for [no-such-topic] from broker 192.168.100.67:9093
May  7 00:45:22.795 INFO  [sarama] client/metadata found some partitions to be leaderless
May  7 00:45:22.795 INFO  [sarama] client/metadata retrying after 250ms... (1 attempts remaining)
May  7 00:45:23.048 INFO  [sarama] client/metadata fetching metadata for [no-such-topic] from broker 192.168.100.67:9093
May  7 00:45:23.049 INFO  [sarama] client/metadata found some partitions to be leaderless
May  7 00:45:23.049 INFO  </T[2]/cons[0]/G:g1[0]/manager[0]/rebalancer[0]> stopped
May  7 00:45:23.049 ERROR </T[2]/cons[0]/G:g1[0]/manager[0]> rebalance failed: err=(failed to get partition list: topic=no-such-topic, err=(kafka server: Request was for a topic or partition that does not exist on this broker.))
May  7 00:45:23.154 INFO  </T[2]/cons[0]/G:g1[0]/manager[0]/rebalancer[1]> started
May  7 00:45:23.154 INFO  [sarama] client/metadata fetching metadata for [no-such-topic] from broker 192.168.100.67:9093
May  7 00:45:23.155 INFO  [sarama] client/metadata found some partitions to be leaderless
May  7 00:45:23.155 INFO  [sarama] client/metadata retrying after 250ms... (3 attempts remaining)
May  7 00:45:23.165 INFO  </T[2]/cons[0]/G:g1[0]/T:no-such-topic[0]> stopped
May  7 00:45:23.165 INFO  </T[2]/cons[0]/G:g1[0]/dispatcher[0]> child stopped: /T[2]/cons[0]/G:g1[0]/T:no-such-topic[0]
May  7 00:45:23.165 INFO  </T[2]/cons[0]/G:g1[0]/dispatcher[0]> stopped
May  7 00:45:23.181 INFO  </T[2]/cons[0]/G:g1[0]/member[0]> submitted: topics=[]
May  7 00:45:23.184 INFO  </T[2]/cons[0]/G:g1[0]/member[0]> stopped
May  7 00:45:23.184 INFO  </T[2]/cons[0]/G:g1[0]/manager[0]> stopped
May  7 00:45:23.184 INFO  </T[2]/cons[0]/G:g1[0]/msg_stream_f[0]/mapper[0]> stopped
May  7 00:45:23.184 INFO  </T[2]/cons[0]/G:g1[0]> stopped
May  7 00:45:23.184 INFO  </T[2]/cons[0]/dispatcher[0]> child stopped: /T[2]/cons[0]/G:g1[0]
May  7 00:45:23.184 INFO  </T[2]/cons[0]/dispatcher[0]> stopped
May  7 00:45:23.184 INFO  </T[2]/cons[0]/offset_mgr_f[0]/mapper[0]> stopped
May  7 00:45:23.187 INFO  [zk] Recv loop terminated: err=EOF
May  7 00:45:23.187 INFO  [zk] Send loop terminated: err=<nil>
May  7 00:45:23.187 INFO  [sarama] Closing Client
May  7 00:45:23.187 INFO  [sarama] Closing Client
May  7 00:45:23.412 ERROR </T[2]/cons[0]/G:g1[0]/manager[0]/rebalancer[1]> paniced: assignment to entry in nil map, stack=goroutine 176 [running]:
runtime/debug.Stack(0x0, 0x0, 0x0)
    /usr/local/go/src/runtime/debug/stack.go:24 +0x80
github.com/mailgun/kafka-pixy/actor.Spawn.func1.1(0xc82021c7a0)
    /Users/maximvladimirsky/workspace/goworld/src/github.com/mailgun/kafka-pixy/actor/actor.go:63 +0x5b
panic(0x406160, 0xc820760cc0)
    /usr/local/go/src/runtime/panic.go:443 +0x4e9
github.com/mailgun/kafka-pixy/vendor/github.com/Shopify/sarama.(*client).registerBroker(0xc8201ac780, 0xc82071b8f0)
    /Users/maximvladimirsky/workspace/goworld/src/github.com/mailgun/kafka-pixy/vendor/github.com/Shopify/sarama/client.go:382 +0xa9
github.com/mailgun/kafka-pixy/vendor/github.com/Shopify/sarama.(*client).updateMetadata(0xc8201ac780, 0xc82074d920, 0xc800000000, 0x0, 0x0)
    /Users/maximvladimirsky/workspace/goworld/src/github.com/mailgun/kafka-pixy/vendor/github.com/Shopify/sarama/client.go:624 +0x123
github.com/mailgun/kafka-pixy/vendor/github.com/Shopify/sarama.(*client).tryRefreshMetadata(0xc8201ac780, 0xc820550660, 0x1, 0x1, 0x2, 0x0, 0x0)
    /Users/maximvladimirsky/workspace/goworld/src/github.com/mailgun/kafka-pixy/vendor/github.com/Shopify/sarama/client.go:591 +0x33f
github.com/mailgun/kafka-pixy/vendor/github.com/Shopify/sarama.(*client).tryRefreshMetadata.func1(0x974fd8, 0xc820550758, 0x0, 0x0)
    /Users/maximvladimirsky/workspace/goworld/src/github.com/mailgun/kafka-pixy/vendor/github.com/Shopify/sarama/client.go:575 +0x207
github.com/mailgun/kafka-pixy/vendor/github.com/Shopify/sarama.(*client).tryRefreshMetadata(0xc8201ac780, 0xc820550660, 0x1, 0x1, 0x3, 0x0, 0x0)
    /Users/maximvladimirsky/workspace/goworld/src/github.com/mailgun/kafka-pixy/vendor/github.com/Shopify/sarama/client.go:593 +0x473
github.com/mailgun/kafka-pixy/vendor/github.com/Shopify/sarama.(*client).RefreshMetadata(0xc8201ac780, 0xc820550660, 0x1, 0x1, 0x0, 0x0)
    /Users/maximvladimirsky/workspace/goworld/src/github.com/mailgun/kafka-pixy/vendor/github.com/Shopify/sarama/client.go:316 +0x15c
github.com/mailgun/kafka-pixy/vendor/github.com/Shopify/sarama.(*client).Partitions(0xc8201ac780, 0xc8202181c1, 0xd, 0x0, 0x0, 0x0, 0x0, 0x0)
    /Users/maximvladimirsky/workspace/goworld/src/github.com/mailgun/kafka-pixy/vendor/github.com/Shopify/sarama/client.go:217 +0x176
github.com/mailgun/kafka-pixy/vendor/github.com/Shopify/sarama.(Client).Partitions-fm(0xc8202181c1, 0xd, 0x0, 0x0, 0x0, 0x0, 0x0)
    /Users/maximvladimirsky/workspace/goworld/src/github.com/mailgun/kafka-pixy/consumer/groupcsm/groupcsm.go:66 +0x71
github.com/mailgun/kafka-pixy/consumer/groupcsm.(*T).resolvePartitions(0xc820562820, 0xc82000a9f0, 0x1, 0x0, 0x0)
    /Users/maximvladimirsky/workspace/goworld/src/github.com/mailgun/kafka-pixy/consumer/groupcsm/groupcsm.go:269 +0x5c3
github.com/mailgun/kafka-pixy/consumer/groupcsm.(*T).runRebalancer(0xc820562820, 0xc82021c7a0, 0xc82000ba10, 0xc82000a9f0, 0xc8200f6660)
    /Users/maximvladimirsky/workspace/goworld/src/github.com/mailgun/kafka-pixy/consumer/groupcsm/groupcsm.go:201 +0x40
github.com/mailgun/kafka-pixy/consumer/groupcsm.(*T).runManager.func1()
    /Users/maximvladimirsky/workspace/goworld/src/github.com/mailgun/kafka-pixy/consumer/groupcsm/groupcsm.go:181 +0x44
github.com/mailgun/kafka-pixy/actor.Spawn.func1(0x0, 0xc82021c7a0, 0xc82000ba70)
    /Users/maximvladimirsky/workspace/goworld/src/github.com/mailgun/kafka-pixy/actor/actor.go:68 +0x112
created by github.com/mailgun/kafka-pixy/actor.Spawn
    /Users/maximvladimirsky/workspace/goworld/src/github.com/mailgun/kafka-pixy/actor/actor.go:69 +0x67

Expose all configuration knobs

Make all configuration parameters from the service config available for turning by users. There should be two ways to define them:

  • in a configuration file that name is passed to the service via the --config-file command line parameter;
  • in a string that represents a valid YAML document via the --config-yaml command line parameter.

Panic in partition multiplexer

</_[0]/cons[0]/G:kraken[0]/mux[0]> paniced: runtime error: index out of range, stack=goroutine 195 [running]:
runtime/debug.Stack(0xc420856c80, 0x7215c0, 0xc420012070)
	/usr/local/go/src/runtime/debug/stack.go:24 +0x79
github.com/mailgun/kafka-pixy/actor.Spawn.func1.1(0xc4202639e0)
	/go/src/github.com/mailgun/kafka-pixy/actor/actor.go:63 +0xe5
panic(0x7215c0, 0xc420012070)
	/usr/local/go/src/runtime/panic.go:458 +0x243
github.com/mailgun/kafka-pixy/consumer/multiplexer.(*T).run(0xc420184f50)
	/go/src/github.com/mailgun/kafka-pixy/consumer/multiplexer/multiplexer.go:193 +0x607
github.com/mailgun/kafka-pixy/consumer/multiplexer.(*T).(github.com/mailgun/kafka-pixy/consumer/multiplexer.run)-fm()
	/go/src/github.com/mailgun/kafka-pixy/consumer/multiplexer/multiplexer.go:136 +0x2a
github.com/mailgun/kafka-pixy/actor.Spawn.func1(0xc420184f88, 0xc4202639e0, 0xc420249d80)
	/go/src/github.com/mailgun/kafka-pixy/actor/actor.go:68 +0xca
created by github.com/mailgun/kafka-pixy/actor.Spawn
	/go/src/github.com/mailgun/kafka-pixy/actor/actor.go:69 +0x58

Failure to start because of existing UNIX named socket

After a server reboot, I just noticed a case where kafka-pixy wasn't starting. Only after removing the left-over UNIX socket file, it started immediately. We need to unlink this file explicitly before binding to it.

Empty consumer group name should not be allowed

I have looked around quite a bit but have not been able to find the reason or fix for the following Zookeeper errors that I see printed on kafka-Pixy output console when the consumer connects. I have tried both C++ and Python gRPC stubs but the end result is the same.

Jun 23 19:56:42.894 INFO [sarama] Connected to broker at localhost:9092 (registered as #0)
Jun 23 19:56:47.054 INFO </default[0]/cons[0]/G:[0]> started
Jun 23 19:56:47.054 INFO </default[0]/cons[0]/G:[0]/dispatcher[0]> started
Jun 23 19:56:47.054 INFO </default[0]/cons[0]/G:[0]/member[0]> started
Jun 23 19:56:47.054 INFO </default[0]/cons[0]/G:[0]/T:myMsgQueue[0]> started
Jun 23 19:56:47.054 INFO </default[0]/cons[0]/G:[0]/manager[0]> started
Jun 23 19:56:47.054 INFO </default[0]/cons[0]/G:[0]/msg_stream_f[0]/mapper[0]> started
Jun 23 19:56:47.059 ERROR </default[0]/cons[0]/G:[0]/member[0]> failed to create a group znode: err=(zk: unknown error)
Jun 23 19:56:47.570 ERROR </default[0]/cons[0]/G:[0]/member[0]> failed to create a group znode: err=(zk: unknown error)

I start Kafka in a container as follows:

docker pull spotify/kafka
docker run --rm -p 2181:2181 -p 9092:9092 --env
ADVERTISED_HOST=localhost --env ADVERTISED_PORT=9092
--name kafka -h kafka spotify/kafka

Panic in partition consumer

From a production log:

</_[0]/cons[0]/G:kraken[0]/P:iad_ml_count_events_0[0]> paniced: runtime error: invalid memory address or nil pointer dereference, stack=goroutine 254 [running]:
runtime/debug.Stack(0xc420b219f8, 0x7215a0, 0xc420012030)
	/usr/local/go/src/runtime/debug/stack.go:24 +0x79
github.com/mailgun/kafka-pixy/actor.Spawn.func1.1(0xc4202c7c80)
	/go/src/github.com/mailgun/kafka-pixy/actor/actor.go:63 +0xe5
panic(0x7215a0, 0xc420012030)
	/usr/local/go/src/runtime/panic.go:458 +0x243
github.com/mailgun/kafka-pixy/consumer/partitioncsm.(*T).run(0xc420100ab0)
	/go/src/github.com/mailgun/kafka-pixy/consumer/partitioncsm/partitioncsm.go:151 +0xba7
github.com/mailgun/kafka-pixy/consumer/partitioncsm.(*T).(github.com/mailgun/kafka-pixy/consumer/partitioncsm.run)-fm()
	/go/src/github.com/mailgun/kafka-pixy/consumer/partitioncsm/partitioncsm.go:60 +0x2a
github.com/mailgun/kafka-pixy/actor.Spawn.func1(0xc420100b28, 0xc4202c7c80, 0xc420a9b7d0)
	/go/src/github.com/mailgun/kafka-pixy/actor/actor.go:68 +0xca
created by github.com/mailgun/kafka-pixy/actor.Spawn
	/go/src/github.com/mailgun/kafka-pixy/actor/actor.go:69 +0x58

Rebalancing fails due to non existent topic

Logs show that rebalancing was failing with the following error:

INFO [sarama] </smartConsumer[0]/ledger_reboot[2]/rebalance[71949]> entered: [map[account_events:/smartConsumer[0]/ledger_reboot[2]/account_events[0] pipeline_events:/smartConsumer[0]/ledger_reboot[2]/pipeline_events[0] domain_events:/smartConsumer[0]/ledger_reboot[2]/domain_events[0]] map[pixy_ledger1.dfw.definbox.com_7801_2016-04-05T19:53:04Z:[account_events domain_events pipeline_events]]]
ERROR </smartConsumer[0]/ledger_reboot[2]/managePartitions[0]> rebalance failed: err=(failed to get partition list: topic=domain_events, err=(kafka server: Request was for a topic or partition that does not exist on this broker.))

The reason is the missing domain_events topic.

Messages consumed twice during rebalancing

According to the logs when a partitions is being released due to rebalancing it sometimes fails to commit the last consumed offset to Kafka, that results in duplicate consumption of some messages by another Kafka-Pixy instance.

Panic in partition multiplexer

panic: runtime error: invalid memory address or nil pointer dereference [recovered]
        panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x48 pc=0x6b3ec5]

goroutine 2752 [running]:
panic(0x7105e0, 0xc420012030)
        /usr/local/go/src/runtime/panic.go:500 +0x1a1
github.com/mailgun/kafka-pixy/actor.Spawn.func1.1(0xc4206eace0)
        /go/src/github.com/mailgun/kafka-pixy/actor/actor.go:64 +0x1e9
panic(0x7105e0, 0xc420012030)
        /usr/local/go/src/runtime/panic.go:458 +0x243
github.com/mailgun/kafka-pixy/consumer/topiccsm.(*T).Messages(0x0, 0xc421986300)
        /go/src/github.com/mailgun/kafka-pixy/consumer/topiccsm/topiccsm.go:56 +0x5
github.com/mailgun/kafka-pixy/consumer/multiplexer.(*T).run(0xc4200aab40)
        /go/src/github.com/mailgun/kafka-pixy/consumer/multiplexer/multiplexer.go:187 +0x473
github.com/mailgun/kafka-pixy/consumer/multiplexer.(*T).(github.com/mailgun/kafka-pixy/consumer/multiplexer.run)-fm()
        /go/src/github.com/mailgun/kafka-pixy/consumer/multiplexer/multiplexer.go:130 +0x2a
github.com/mailgun/kafka-pixy/actor.Spawn.func1(0xc4200aab78, 0xc4206eace0, 0xc420c13890)
        /go/src/github.com/mailgun/kafka-pixy/actor/actor.go:68 +0xca
created by github.com/mailgun/kafka-pixy/actor.Spawn
        /go/src/github.com/mailgun/kafka-pixy/actor/actor.go:69 +0x58
panic: runtime error: invalid memory address or nil pointer dereference [recovered]
        panic: runtime error: invalid memory address or nil pointer dereference

Inexplicable offset manager timeouts

From time to time we see in the logs that the offset manager timeout elapses. The timeout means that we do not commit offsets in time, therefore more messages can be delivered more than once in case of Kafka-Pixy crash. The root cause of this issue is not clear and should be investigated, understood and fixed before it gets out of hand.

request timeout 1.500776182s
github.com/mailgun/kafka-pixy/offsetmgr.(*offsetMgr).run
	/go/src/github.com/mailgun/kafka-pixy/offsetmgr/offsetmgr.go:320
github.com/mailgun/kafka-pixy/offsetmgr.(*offsetMgr).(github.com/mailgun/kafka-pixy/offsetmgr.run)-fm
	/go/src/github.com/mailgun/kafka-pixy/offsetmgr/offsetmgr.go:138
github.com/mailgun/kafka-pixy/actor.Spawn.func1
	/go/src/github.com/mailgun/kafka-pixy/actor/actor.go:98
runtime.goexit
	/usr/local/go/src/runtime/asm_amd64.s:2337

The issue becomes more pronounced when several heavily consuming Kafka-Pixy instance are restarted at once (e.g. during deployment).

Add support for explicit acknowledgments

Kafka-Pixy commits a message offset as consumed as soon as it is read from an internal consumer channel buffer. That is there is no guarantee that a committed offset has been seen by the caller. To make sure that no messages missed by clients some sort of explicit acknowledgment is required.

There is an idea to implement explicit acknowledgment via a parameter of GET /topics/{}/messages method (e.g. ack=<offset>). So that a client can both retrieve a new message and acknowledge the previous one in one HTTP request saving a network round-trip.

Consumption of partition stops if segment expires

If a segment of a topic partition is deleted by Kafka due to retention period exceeded while Kafka-Pixy reads from it, then consumption from the partition stops. There is no even log record with ERROR severity reported.

Mar 23 07:01:38  </_[0]/cons[0]/G:analytics[1]/P:use1_pipeline_events_35[0]> started
Mar 23 07:01:38  </_[0]/cons[0]/G:analytics[1]/P:use1_pipeline_events_35[0]> partition claimed: via=/_[0]/cons[0]/G:analytics[1]/member[0], retries=0, took=280ms
Mar 23 07:01:38  </_[0]/cons[0]/G:analytics[1]/P:use1_pipeline_events_35[0]/offset_mgr[0]> started
Mar 23 07:01:38  </_[0]/offset_mgr_f[0]/mapper[0]/reassign[164]> assign /_[0]/cons[0]/G:analytics[1]/P:use1_pipeline_events_35[0]/offset_mgr[0] -> /_[0]/offset_mgr_f[0]/broker_2_aggr[0] (ref=9)
Mar 23 07:01:38  </_[0]/cons[0]/G:analytics[1]/P:use1_pipeline_events_35[0]/offset_mgr[0]> assigned /_[0]/offset_mgr_f[0]/broker_2_aggr[0]
Mar 23 07:01:40  </_[0]/cons[0]/G:analytics[1]/P:use1_pipeline_events_35[0]> initialized: offset=485662619, sparseAcks=
Mar 23 07:01:40  </_[0]/cons[0]/G:analytics[1]/P:use1_pipeline_events_35[0]/msg_stream[0]> started
Mar 23 07:01:40  </_[0]/cons[0]/G:analytics[1]/msg_stream_f[0]/mapper[0]/reassign[5]> assign /_[0]/cons[0]/G:analytics[1]/P:use1_pipeline_events_35[0]/msg_stream[0] -> /_[0]/cons[0]/G:analytics[1]/msg_stream_f[0]/broker_0_aggr[0] (ref=2)
Mar 23 07:01:40  </_[0]/cons[0]/G:analytics[1]/P:use1_pipeline_events_35[0]/msg_stream[0]> assigned /_[0]/cons[0]/G:analytics[1]/msg_stream_f[0]/broker_0_aggr[0]
Mar 23 07:02:39  </_[0]/cons[0]/G:analytics[1]/P:use1_pipeline_events_35[0]/msg_stream[0]> fetch failed: err=kafka server: The requested offset is outside the range of offsets maintained by the server for the given topic/partition.
Mar 23 07:02:39  </_[0]/cons[0]/G:analytics[1]/P:use1_pipeline_events_35[0]/msg_stream[0]> stopped

Add support for Kafka 0.9.0 group management

New set of API was introduced in Kafka 0.9.0 that allows using Kafka for consumer group management and leaving clients free of direct communication with ZooKeeper. It should be possible to make Kafka-Pixy use this API.

81 of 100?

#!/bin/bash

export TOPIC="test10"
export ENTRIES=100

curl -G --silent "http://localhost:19092/topics/${TOPIC}/messages?group=bar"

function pushit() {
    curl --silent -X POST "http://localhost:19092/topics/${TOPIC}/messages?sync" -H "Content-Type: text/plain" -d "Entry $1"
}
export -f pushit
time (seq $ENTRIES | parallel -j10 pushit)

curl -X POST -H'Content-Type: application/json' -d'[{"partition":0,"offset":0}]' "http://localhost:19092/topics/${TOPIC}/offsets?group=bar"
echo
sleep 2

function pullit() {
    value=$(curl -G --silent "http://localhost:19092/topics/${TOPIC}/messages?group=bar" | tr "\n" " ")
    dec=$(echo $value  | jq '.value' -r | base64 --decode)
    echo "$dec $value"
}
export -f pullit
time (seq $ENTRIES | parallel -j10 pullit)

So, the level of parallelism is 10:

Entry 1 {   "key": null,   "value": "RW50cnkgMQ==",   "partition": 0,   "offset": 0 }
Entry 2 {   "key": null,   "value": "RW50cnkgMg==",   "partition": 0,   "offset": 1 }
...(skip)...
Entry 74 {   "key": null,   "value": "RW50cnkgNzQ=",   "partition": 0,   "offset": 73 }
Entry 75 {   "key": null,   "value": "RW50cnkgNzU=",   "partition": 0,   "offset": 74 }
Entry 76 {   "key": null,   "value": "RW50cnkgNzY=",   "partition": 0,   "offset": 75 }
Entry 77 {   "key": null,   "value": "RW50cnkgNzc=",   "partition": 0,   "offset": 76 }
Entry 79 {   "key": null,   "value": "RW50cnkgNzk=",   "partition": 0,   "offset": 77 }
Entry 78 {   "key": null,   "value": "RW50cnkgNzg=",   "partition": 0,   "offset": 78 }
Entry 80 {   "key": null,   "value": "RW50cnkgODA=",   "partition": 0,   "offset": 79 }
Entry 81 {   "key": null,   "value": "RW50cnkgODE=",   "partition": 0,   "offset": 80 }















๏ฟฝ๏ฟฝe {   "error": "long polling timeout" }
๏ฟฝ๏ฟฝe {   "error": "long polling timeout" }
๏ฟฝ๏ฟฝe {   "error": "long polling timeout" }
๏ฟฝ๏ฟฝe {   "error": "long polling timeout" }

I get back only 81 entries, 4 timeouts and quite a few empty lines :)

python startup guide has a broken link

The links for protoc generated files in bullet #3 are pointing to the same file:

Create kafkapixy package in your application and copy kafkapixy_pb2.py and kafkapixy_pb2_grpc.py files to it.

Make /topics/<>/consumers output easier to read

At the moment the output of GET /topics/<>/consumers looks like this:

"test-group": {
    "pixy_host1_1846_2015-10-28T07:45:04Z": [
      0,
      1,
      2,
      3,
      4,
      5
    ],
    "pixy_host2_1799_2015-10-28T07:18:42Z": [
      6,
      7,
      8,
      9,
      10,
      11
    ],
    "pixy_host3_1883_2015-10-28T04:17:35Z": [
      12,
      13,
      14,
      15,
      16,
      17
    ]
}

It would be easier to digest if all partitions were listed on the same line, e.g.:

"test-group": {
    "pixy_host1_1846_2015-10-28T07:45:04Z": [0,1,2,3,4,5],
    "pixy_host2_1799_2015-10-28T07:18:42Z": [6,7,8,9,10,11],
    "pixy_host3_1883_2015-10-28T04:17:35Z": [12,13,14,15,16,17]
}

Offset commit failures reported

I the logs I see a lot of offset commit failure errors, but according to the GET /topics/{}/offsets commits seemed to be updated anyway. See a log snapshot with the error below:

Sep 18 19:31:58 jobs1 kafka-pixy[83177]: INFO </offsetManager[2]/kraken:ml_count_events:0[6]/processCommits[0]> offset commit failed: err=(request timeout)
Sep 18 19:31:58 jobs1 kafka-pixy[83177]: INFO </offsetManager[2]/watch4Changes[0]> reassign: change={created=0, outdated=1, closed=0, failed=0}
Sep 18 19:31:58 jobs1 kafka-pixy[83177]: INFO </offsetManager[2]/watch4Changes[0]/reassign[908]> entered: [{created=0, outdated=1, closed=0, failed=0}]
Sep 18 19:31:58 jobs1 kafka-pixy[83177]: INFO client/coordinator requesting coordinator for consumergoup kraken from kafka2.ord2.postgun.com:9092
Sep 18 19:31:58 jobs1 kafka-pixy[83177]: INFO client/coordinator coordinator for consumergoup kraken is #2 (kafka3.ord2.postgun.com:9092)
Sep 18 19:31:58 jobs1 kafka-pixy[83177]: INFO </offsetManager[2]/watch4Changes[0]/reassign[908]> unassign /offsetManager[2]/kraken:ml_count_events:0[6] -> /offsetManager[2]/broker:2[4] (ref=7)
Sep 18 19:31:58 jobs1 kafka-pixy[83177]: INFO </offsetManager[2]/watch4Changes[0]/reassign[908]> assign /offsetManager[2]/kraken:ml_count_events:0[6] -> /offsetManager[2]/broker:2[4] (ref=8)
Sep 18 19:31:58 jobs1 kafka-pixy[83177]: INFO </offsetManager[2]/watch4Changes[0]/reassign[908]> leaving

Subscriptions can be lost due to ZooKeeper connection failure

From the logs below it looks like Kafka-Pixy may lose ALL current topic subscriptions if a connection to ZooKeeper node breaks for some period of time.

Sep 17 09:39:32 binnacle1 kafka-pixy[12820]: INFO </registry:kraken[13]/watcher[1]> retrieving group membership state...
Sep 17 09:39:33 binnacle1 kafka-pixy[12820]: INFO Failed to connect to bus3.dfw.definbox.com:2181: dial tcp 10.223.224.37:2181: i/o timeout
Sep 17 09:39:33 binnacle1 kafka-pixy[12820]: ERROR </registry:kraken[13]/watcher[1]> failed to watch members: err=(zk: could not connect to a server), retryIn=500ms
Sep 17 09:39:33 binnacle1 kafka-pixy[12820]: INFO Failed to set previous watches: zk: could not connect to a server
Sep 17 09:39:33 binnacle1 kafka-pixy[12820]: INFO client/metadata fetching metadata for all topics from broker bus3.dfw.definbox.com:9092
Sep 17 09:39:33 binnacle1 kafka-pixy[12820]: INFO client/metadata fetching metadata for all topics from broker bus3.dfw.definbox.com:9092
Sep 17 09:39:33 binnacle1 kafka-pixy[12820]: INFO client/metadata fetching metadata for all topics from broker bus1.dfw.definbox.com:9092
Sep 17 09:39:33 binnacle1 kafka-pixy[12820]: INFO client/metadata fetching metadata for all topics from broker bus3.dfw.definbox.com:9092
Sep 17 09:39:34 binnacle1 kafka-pixy[12820]: ERROR </registry:kraken[13]/watcher[1]> failed to watch members: err=(zk: could not connect to a server), retryIn=500ms
Sep 17 09:39:35 binnacle1 kafka-pixy[12820]: ERROR </registry:kraken[13]/watcher[1]> failed to watch members: err=(zk: could not connect to a server), retryIn=500ms
Sep 17 09:39:36 binnacle1 kafka-pixy[12820]: INFO </registry:kraken[13]/watcher[1]> fetching group subscriptions...
Sep 17 09:39:36 binnacle1 kafka-pixy[12820]: INFO </registry:kraken[13]/watcher[1]> group subscriptions changed: map[]
Sep 17 09:39:36 binnacle1 kafka-pixy[12820]: INFO </smartConsumer[3]/kraken[7]/rebalance[6]> entered: [map[ml_count_events:/smartConsumer[3]/kraken[7]/ml_count_events[2]] map[]]
Sep 17 09:39:36 binnacle1 kafka-pixy[12820]: INFO </consumer[1]/ml_count_events:1[27]/pullMessages[0]> leaving
Sep 17 09:39:36 binnacle1 kafka-pixy[12820]: INFO </consumer[1]/ml_count_events:0[25]/pullMessages[0]> leaving
Sep 17 09:39:36 binnacle1 kafka-pixy[12820]: INFO </offsetManager[2]/kraken:ml_count_events:1[21]/processCommits[0]> leaving
Sep 17 09:39:36 binnacle1 kafka-pixy[12820]: INFO </consumer[1]/watch4Changes[0]> reassign: change={created=0, outdated=0, closed=1, failed=0}
Sep 17 09:39:36 binnacle1 kafka-pixy[12820]: INFO </offsetManager[2]/kraken:ml_count_events:0[19]/processCommits[0]> leaving
Sep 17 09:39:36 binnacle1 kafka-pixy[12820]: INFO </consumer[1]/watch4Changes[0]/reassign[26]> entered: [{created=0, outdated=0, closed=1, failed=0}]
Sep 17 09:39:36 binnacle1 kafka-pixy[12820]: INFO </consumer[1]/watch4Changes[0]/reassign[26]> unassign /consumer[1]/ml_count_events:1[27] -> /consumer[1]/broker:1[28] (ref=0)
Sep 17 09:39:36 binnacle1 kafka-pixy[12820]: INFO </offsetManager[2]/watch4Changes[0]> reassign: change={created=0, outdated=0, closed=1, failed=0}
Sep 17 09:39:36 binnacle1 kafka-pixy[12820]: INFO </consumer[1]/watch4Changes[0]/reassign[26]> decomission /consumer[1]/broker:1[28]
Sep 17 09:39:36 binnacle1 kafka-pixy[12820]: INFO </offsetManager[2]/watch4Changes[0]/reassign[27]> entered: [{created=0, outdated=0, closed=1, failed=0}]
Sep 17 09:39:36 binnacle1 kafka-pixy[12820]: INFO </consumer[1]/broker:1[28]/batchRequests[0]> leaving
Sep 17 09:39:36 binnacle1 kafka-pixy[12820]: INFO </offsetManager[2]/watch4Changes[0]/reassign[27]> unassign /offsetManager[2]/kraken:ml_count_events:1[21] -> /offsetManager[2]/broker:1[20] (ref=1)
Sep 17 09:39:36 binnacle1 kafka-pixy[12820]: INFO </offsetManager[2]/watch4Changes[0]/reassign[27]> leaving
Sep 17 09:39:36 binnacle1 kafka-pixy[12820]: INFO </offsetManager[2]/watch4Changes[0]> reassign: change={created=0, outdated=0, closed=1, failed=0}
Sep 17 09:39:36 binnacle1 kafka-pixy[12820]: INFO </offsetManager[2]/watch4Changes[0]/reassign[28]> entered: [{created=0, outdated=0, closed=1, failed=0}]
Sep 17 09:39:36 binnacle1 kafka-pixy[12820]: INFO </offsetManager[2]/watch4Changes[0]/reassign[28]> unassign /offsetManager[2]/kraken:ml_count_events:0[19] -> /offsetManager[2]/broker:1[20] (ref=0)
Sep 17 09:39:36 binnacle1 kafka-pixy[12820]: INFO </offsetManager[2]/watch4Changes[0]/reassign[28]> decomission /offsetManager[2]/broker:1[20]
Sep 17 09:39:36 binnacle1 kafka-pixy[12820]: INFO </offsetManager[2]/broker:1[20]/batchCommits[0]> leaving
Sep 17 09:39:36 binnacle1 kafka-pixy[12820]: INFO </offsetManager[2]/broker:1[20]/executeBatches[1]> leaving
Sep 17 09:39:36 binnacle1 kafka-pixy[12820]: INFO </offsetManager[2]/watch4Changes[0]/reassign[28]> leaving
Sep 17 09:39:36 binnacle1 kafka-pixy[12820]: INFO </smartConsumer[3]/kraken[7]/ml_count_events:0[4]> partition released
Sep 17 09:39:36 binnacle1 kafka-pixy[12820]: INFO </smartConsumer[3]/kraken[7]/ml_count_events:1[5]> partition released
Sep 17 09:39:36 binnacle1 kafka-pixy[12820]: INFO </smartConsumer[3]/kraken[7]/ml_count_events:0[4]> leaving
Sep 17 09:39:36 binnacle1 kafka-pixy[12820]: INFO </smartConsumer[3]/kraken[7]/ml_count_events:1[5]> leaving
Sep 17 09:39:36 binnacle1 kafka-pixy[12820]: INFO </smartConsumer[3]/kraken[7]/rebalance[6]> leaving
Sep 17 09:39:36 binnacle1 kafka-pixy[12820]: INFO </consumer[1]/broker:1[28]/executeBatches[1]> leaving
Sep 17 09:39:36 binnacle1 kafka-pixy[12820]: INFO </consumer[1]/watch4Changes[0]/reassign[26]> leaving
Sep 17 09:39:36 binnacle1 kafka-pixy[12820]: INFO </consumer[1]/watch4Changes[0]> reassign: change={created=0, outdated=0, closed=1, failed=0}
Sep 17 09:39:36 binnacle1 kafka-pixy[12820]: INFO </consumer[1]/watch4Changes[0]/reassign[27]> entered: [{created=0, outdated=0, closed=1, failed=0}]
Sep 17 09:39:36 binnacle1 kafka-pixy[12820]: INFO </consumer[1]/watch4Changes[0]/reassign[27]> unassign /consumer[1]/ml_count_events:0[25] -> /consumer[1]/broker:0[26] (ref=0)
Sep 17 09:39:36 binnacle1 kafka-pixy[12820]: INFO </consumer[1]/watch4Changes[0]/reassign[27]> decomission /consumer[1]/broker:0[26]
Sep 17 09:39:36 binnacle1 kafka-pixy[12820]: INFO </consumer[1]/broker:0[26]/batchRequests[0]> leaving
Sep 17 09:39:36 binnacle1 kafka-pixy[12820]: INFO </consumer[1]/broker:0[26]/executeBatches[1]> leaving
Sep 17 09:39:36 binnacle1 kafka-pixy[12820]: INFO </consumer[1]/watch4Changes[0]/reassign[27]> leaving

Panic in group consumer

The following panic appears in the log :

May  4 15:48:37.460 INFO  </cons[0]/G:ledger_reboot[0]/P:pipeline_events_14[0]/msg_stream[0]> assigned /cons[0]/G:ledger_reboot[0]/msg_stream_f[0]/broker_2_aggr[0]
May  4 15:49:19.076 INFO  </cons[0]/G:ledger_reboot[0]/dispatcher[0]> child expired: /cons[0]/G:ledger_reboot[0]/T:user_events[0]
May  4 15:49:19.076 INFO  </cons[0]/G:ledger_reboot[0]/T:user_events[0]> stopped
May  4 15:49:19.076 INFO  </cons[0]/G:ledger_reboot[0]/dispatcher[0]> child stopped: /cons[0]/G:ledger_reboot[0]/T:user_events[0]
May  4 15:49:19.077 INFO  </cons[0]/G:ledger_reboot[0]/dispatcher[0]> child expired: /cons[0]/G:ledger_reboot[0]/T:account_events[0]
May  4 15:49:19.077 INFO  </cons[0]/G:ledger_reboot[0]/T:account_events[0]> stopped
May  4 15:49:19.077 INFO  </cons[0]/G:ledger_reboot[0]/dispatcher[0]> child stopped: /cons[0]/G:ledger_reboot[0]/T:account_events[0]
May  4 15:49:19.080 INFO  </cons[0]/G:ledger_reboot[0]/dispatcher[0]> child expired: /cons[0]/G:ledger_reboot[0]/T:pipeline_events[0]
May  4 15:49:19.081 INFO  </cons[0]/G:ledger_reboot[0]/T:pipeline_events[0]> stopped
May  4 15:49:19.081 INFO  </cons[0]/G:ledger_reboot[0]/dispatcher[0]> child stopped: /cons[0]/G:ledger_reboot[0]/T:pipeline_events[0]
May  4 15:49:19.088 INFO  </cons[0]/G:ledger_reboot[0]/member[0]> submitted: topics=[account_events iad_account_events iad_pipeline_events iad_user_events pipeline_events]
May  4 15:49:19.100 INFO  </cons[0]/G:ledger_reboot[0]/member[0]> submitted: topics=[iad_account_events iad_pipeline_events iad_user_events]
May  4 15:49:19.352 INFO  </cons[0]/G:ledger_reboot[0]/member[0]> fetched subscriptions: map[pixy_ledger1.ord2.postgun.com_13994_2016-05-04T15:46:18Z:[iad_account_events iad_pipeline_events iad_user_events]]
May  4 15:49:19.352 INFO  </cons[0]/G:ledger_reboot[0]/manager[0]/rebalancer[1]> started
May  4 15:49:19.352 INFO  </cons[0]/G:ledger_reboot[0]/manager[0]/rebalancer[1]> assigned partitions: map[iad_account_events:[0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15] iad_pipeline_events:[0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23] iad_
user_events:[0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23]]
May  4 15:49:19.352 ERROR </cons[0]/G:ledger_reboot[0]/manager[0]/rebalancer[1]> paniced: runtime error: invalid memory address or nil pointer dereference, stack=
/home/mg/goworld/src/github.com/mailgun/kafka-pixy/actor/actor.go:63 (0x59d5ab)
/usr/local/go/src/runtime/asm_amd64.s:437 (0x45afee)
/usr/local/go/src/runtime/panic.go:423 (0x42ac29)
/usr/local/go/src/runtime/panic.go:42 (0x4292e9)
/usr/local/go/src/runtime/sigpanic_unix.go:24 (0x43fd5a)
/home/mg/goworld/src/github.com/mailgun/kafka-pixy/consumer/groupcsm/groupcsm.go:244 (0x6702b9)
/home/mg/goworld/src/github.com/mailgun/kafka-pixy/consumer/groupcsm/groupcsm.go:212 (0x66fad0)
/home/mg/goworld/src/github.com/mailgun/kafka-pixy/consumer/groupcsm/groupcsm.go:181 (0x671a14)
/home/mg/goworld/src/github.com/mailgun/kafka-pixy/actor/actor.go:68 (0x59d922)
/usr/local/go/src/runtime/asm_amd64.s:1696 (0x45d311)

The issue seems to be introduced in v0.11.0

Consumption stops after a Kafka node crash

After a Kafka node crash consume requests started to return 408 Request Timeout even though there were messages in the queue. During the incident the following line appeared over and over in the logs:

</offsetManager[2]/kraken:ml_count_events2:1[26]/processCommits[0]> failed to fetch initial offset: err=(write tcp 10.223.192.38:48356->10.223.144.128:9092: write: broken pipe)

Unable to receive POST data for Content-Type: application/x-www-form-urlencoded

I send the POST sample is fine for any Content-Type except "application/x-www-form-urlencoded".

send:
curl -X POST localhost:19092/topics/foo/messages?sync -H 'Content-Type:text/plain' -d 'blah blah blah'

Response:
{
"partition": 0,
"offset": 41
}

send:
curl -X POST localhost:19092/topics/foo/messages?sync -H 'Content-Type:application/x-www-form-urlencoded' -d 'blah blah blah'

Response:
{
"error": "Message size does not match Content-Length: expected=14, actual=0"
}

How do i receive POST for Content-Type is application/x-www-form-urlencoded?

buffer overflow

#!/bin/bash

## Push test data

function pushit() {
    curl --silent -X POST "http://localhost:19092/topics/foo/messages?sync" -H "Content-Type: text/plain" -d "blah blah blah {}"
}
export -f pushit
time (seq 1000 | parallel -j100 pushit)

## Reset

curl -X POST -H'Content-Type: application/json' -d'[{"partition":0,"offset":0}]' 'http://localhost:19092/topics/foo/offsets?group=bar'
echo
sleep 10

## Pull

function pullit() {
    curl -G --silent 'http://localhost:19092/topics/foo/messages?group=bar' | tr "\n" " "
    echo
}
export -f pullit
time (seq 1000 | parallel -j100 pullit)

So, I'm running then the "pull" step separately (I'm trying to pull the data from Kafka-Pixy with 100 parallel readers) and sometimes I get this:

{   "error": "\u003c/cons[0]/G:bar[12]/T:foo[0]\u003e buffer overflow" }
{   "error": "\u003c/cons[0]/G:bar[12]/T:foo[0]\u003e buffer overflow" }
{   "error": "\u003c/cons[0]/G:bar[12]/T:foo[0]\u003e buffer overflow" }
{   "error": "\u003c/cons[0]/G:bar[12]/T:foo[0]\u003e buffer overflow" }
{   "error": "\u003c/cons[0]/G:bar[12]/T:foo[0]\u003e buffer overflow" }
{   "error": "\u003c/cons[0]/G:bar[12]/T:foo[0]\u003e buffer overflow" }
{   "error": "\u003c/cons[0]/G:bar[12]/T:foo[0]\u003e buffer overflow" }
{   "error": "\u003c/cons[0]/G:bar[12]/T:foo[0]\u003e buffer overflow" }
{   "error": "\u003c/cons[0]/G:bar[12]/T:foo[0]\u003e buffer overflow" }
{   "error": "\u003c/cons[0]/G:bar[12]/T:foo[0]\u003e buffer overflow" }
{   "error": "\u003c/cons[0]/G:bar[12]/T:foo[0]\u003e buffer overflow" }
{   "error": "\u003c/cons[0]/G:bar[12]/T:foo[0]\u003e buffer overflow" }
{   "error": "\u003c/cons[0]/G:bar[12]/T:foo[0]\u003e buffer overflow" }
{   "error": "\u003c/cons[0]/G:bar[12]/T:foo[0]\u003e buffer overflow" }
{   "error": "\u003c/cons[0]/G:bar[12]/T:foo[0]\u003e buffer overflow" }
{   "error": "\u003c/cons[0]/G:bar[12]/T:foo[0]\u003e buffer overflow" }
{   "error": "\u003c/cons[0]/G:bar[12]/T:foo[0]\u003e buffer overflow" }
{   "error": "\u003c/cons[0]/G:bar[12]/T:foo[0]\u003e buffer overflow" }
{   "error": "\u003c/cons[0]/G:bar[12]/T:foo[0]\u003e buffer overflow" }
{   "error": "\u003c/cons[0]/G:bar[12]/T:foo[0]\u003e buffer overflow" }
{   "error": "\u003c/cons[0]/G:bar[12]/T:foo[0]\u003e buffer overflow" }

as a response.

Occasional panics when using the develop branch

Kafka-pixy will occasionally panic when running the maxim/develop branch.

Apr 15 17:00:46.462 INFO  [sarama] client/coordinator coordinator for consumergoup ledger_reboot is #0 (kafka1.ord2.postgun.com:9092)
Apr 15 17:00:46.462 INFO  </offsetManagerFactory[0]/watch4Changes[0]/reassign[160044]> unassign /offsetManagerFactory[0]/ledger_reboot:iad_pipeline_events:8[0] -> /offsetManagerFactory[0]/broker:0[0] (ref=61)
Apr 15 17:00:46.462 INFO  </offsetManagerFactory[0]/watch4Changes[0]/reassign[160044]> assign /offsetManagerFactory[0]/ledger_reboot:iad_pipeline_events:8[0] -> /offsetManagerFactory[0]/broker:0[0] (ref=62)
Apr 15 17:00:46.462 INFO  </offsetManagerFactory[0]/watch4Changes[0]/reassign[160044]> leaving
Apr 15 17:00:46.529 INFO  </offsetManagerFactory[0]/ledger_reboot:pipeline_events:5[0]/processCommits[0]> schedule reassign: reason=offset commit failed, err=(request timeout)
Apr 15 17:00:46.551 INFO  </offsetManagerFactory[0]/ledger_reboot:iad_pipeline_events:10[0]/processCommits[0]> leaving
Apr 15 17:00:46.551 INFO  </consumer[0]/watch4Changes[0]> reassign: change={created=0, outdated=0, closed=1}
Apr 15 17:00:46.551 INFO  </consumer[0]/watch4Changes[0]/reassign[35]> entered: [{created=0, outdated=0, closed=1}]
Apr 15 17:00:46.551 INFO  </offsetManagerFactory[0]/watch4Changes[0]/reassign[160045]> entered: [{created=0, outdated=0, closed=1}]
Apr 15 17:00:46.551 INFO  </consumer[0]/watch4Changes[0]/reassign[35]> unassign /consumer[0]/iad_pipeline_events:10[0] -> /consumer[0]/broker:4[0] (ref=12)
Apr 15 17:00:46.551 INFO  </offsetManagerFactory[0]/watch4Changes[0]/reassign[160045]> unassign /offsetManagerFactory[0]/ledger_reboot:iad_pipeline_events:10[0] -> /offsetManagerFactory[0]/broker:0[0] (ref=61)
Apr 15 17:00:46.551 INFO  </consumer[0]/watch4Changes[0]/reassign[35]> leaving
2016/04/15 17:00:46 http: panic serving @: runtime error: invalid memory address or nil pointer dereference
goroutine 160745 [running]:
net/http.(*conn).serve.func1(0xc823b0ba80)
        /opt/go/src/net/http/server.go:1389 +0xc1
panic(0x8554a0, 0xc8200120a0)
        /opt/go/src/runtime/panic.go:426 +0x4e9
github.com/mailgun/kafka-pixy/apiserver.(*T).handleConsume(0xc82010ec30, 0x7f709ba28968, 0xc826846270, 0xc826d33b20)
        /home/mg/goworld/src/github.com/mailgun/kafka-pixy/apiserver/apiserver.go:207 +0x40a
github.com/mailgun/kafka-pixy/apiserver.(*T).(github.com/mailgun/kafka-pixy/apiserver.handleConsume)-fm(0x7f709ba28968, 0xc826846270, 0xc826d33b20)
        /home/mg/goworld/src/github.com/mailgun/kafka-pixy/apiserver/apiserver.go:83 +0x3e
net/http.HandlerFunc.ServeHTTP(0xc8202303a0, 0x7f709ba28968, 0xc826846270, 0xc826d33b20)
        /opt/go/src/net/http/server.go:1618 +0x3a
github.com/mailgun/kafka-pixy/vendor/github.com/gorilla/mux.(*Router).ServeHTTP(0xc82010ebe0, 0x7f709ba28968, 0xc826846270, 0xc826d33b20)
        /home/mg/goworld/src/github.com/mailgun/kafka-pixy/vendor/github.com/gorilla/mux/mux.go:98 +0x29e
github.com/mailgun/kafka-pixy/vendor/github.com/mailgun/manners.(*gracefulHandler).ServeHTTP(0xc82020a3a0, 0x7f709ba28968, 0xc826846270, 0xc826d33b20)
        /home/mg/goworld/src/github.com/mailgun/kafka-pixy/vendor/github.com/mailgun/manners/server.go:333 +0x7e
net/http.serverHandler.ServeHTTP(0xc820111000, 0x7f709ba28968, 0xc826846270, 0xc826d33b20)
        /opt/go/src/net/http/server.go:2081 +0x19e
net/http.(*conn).serve(0xc823b0ba80)
        /opt/go/src/net/http/server.go:1472 +0xf2e
created by net/http.(*Server).Serve
        /opt/go/src/net/http/server.go:2137 +0x44e
Apr 15 17:00:46.554 INFO  </smartConsumer[0]/ledger_reboot[0]/iad_pipeline_events:10[0]> partition released
Apr 15 17:00:46.554 INFO  </smartConsumer[0]/ledger_reboot[0]/iad_pipeline_events:10[0]> leaving
panic: runtime error: invalid memory address or nil pointer dereference
[signal 0xb code=0x1 addr=0x48 pc=0x5b422e]

goroutine 135 [running]:
panic(0x8554a0, 0xc8200120a0)
        /opt/go/src/runtime/panic.go:464 +0x3e6
github.com/mailgun/kafka-pixy/consumer.(*exclusiveConsumer).run(0xc8205b45a0)
        /home/mg/goworld/src/github.com/mailgun/kafka-pixy/consumer/smart_consumer.go:342 +0xc2e
github.com/mailgun/kafka-pixy/consumer.(*exclusiveConsumer).(github.com/mailgun/kafka-pixy/consumer.run)-fm()
        /home/mg/goworld/src/github.com/mailgun/kafka-pixy/consumer/smart_consumer.go:260 +0x20
github.com/mailgun/kafka-pixy/consumer.spawn.func1(0xc8205b4618, 0xc820192980)
        /home/mg/goworld/src/github.com/mailgun/kafka-pixy/consumer/utils.go:16 +0x44
created by github.com/mailgun/kafka-pixy/consumer.spawn
        /home/mg/goworld/src/github.com/mailgun/kafka-pixy/consumer/utils.go:17 +0x56

Invalid stored offset makes consumer panic

If an offset stored for a topic partition is out of partition bounds then Kafka-Pixy panics on consumption attempt:

panic: </smartConsumer[1]/group-1[0]/test.1:0[0]> failed to start partition consumer: offset=1152, err=(kafka server: The requested offset is outside the range of offsets maintained by the server for the given topic/partition.)

goroutine 704 [running]:
panic(0x4ff6a0, 0xc820680240)
    /usr/local/go/src/runtime/panic.go:464 +0x3e6
github.com/mailgun/kafka-pixy/consumer.(*exclusiveConsumer).run(0xc82051e6c0)
    /Users/maximvladimirsky/workspace/goworld/src/github.com/mailgun/kafka-pixy/consumer/smart_consumer.go:292 +0x6d4
github.com/mailgun/kafka-pixy/consumer.(*exclusiveConsumer).(github.com/mailgun/kafka-pixy/consumer.run)-fm()
    /Users/maximvladimirsky/workspace/goworld/src/github.com/mailgun/kafka-pixy/consumer/smart_consumer.go:260 +0x20
github.com/mailgun/kafka-pixy/consumer.spawn.func1(0xc82051e738, 0xc82047f6d0)
    /Users/maximvladimirsky/workspace/goworld/src/github.com/mailgun/kafka-pixy/consumer/utils.go:16 +0x44
created by github.com/mailgun/kafka-pixy/consumer.spawn
    /Users/maximvladimirsky/workspace/goworld/src/github.com/mailgun/kafka-pixy/consumer/utils.go:17 +0x56
exit status 2
FAIL    github.com/mailgun/kafka-pixy/consumer  7.868s

That can happen when consumption of a topic stopped for period of time greater than the topic retention period.

Topic consumption stops

Sometimes topic consumption stops even though Kafka-Pixy retains ownership of its partitions.

Subscription to a topic fails indefinitely after ZooKeeper connection loss

Following sequence of events was observed:

  1. The subscriber actor reported an error to subscribe to a new topic with error zk: could not connect to a server;
  2. More than a minute after the first error the second one appeared but this time it was zk: node does not exist;
  3. From that moment on error zk: node does not exist kept reappearing every 500ms;

My theory is that after the initial connection error it took the ZooKeeper client so long to re-establish the connection that our ephemeral registration node expired but the subscriber kept acting as if it existed. To fix this issue we need to make the subscriber handle the case of missing registration gracefully.

Add offset introspection method

It would be nice to have GET /topics/{topic}/offsets?group={group} method that would return oldest, newest offsets for all partitions, along with respective last committed offset of the specified group.

More detail on use-cases in relation to microservices

@horkhe
Thanks for this interesting project.
I saw your explanation for the need that kafka-pixy is fulfilling here:
https://groups.google.com/forum/#!topic/kafka-clients/ZEkcwETZa0A

I'm not sure if you are using any framework for micro-services, but we're considering seneca that has a working interface to kafka ... https://www.npmjs.com/package/seneca-kafka-transport ... a version of which we currently use, but it's obviously not leveraging some of the benefits that something like kafka-pixy brings.

Would it be possible to continue a conversation off-line to understand a little more beyond your explanation in the kafka-user forum?

Thanks,
Colum

Request timeout errors logged by offset mananger

Some hosts are generating a bunch of request timeout errors e.g.:

</cons[0]/G:integrations[0]/P:account_events_1[0]/offset_mgr[0]> trigger reassign: reason=offset commit failed, err=(request timeout)

Although it does not seem to cause neither message loses nor duplicate consumptions.

Messages are skipped by consumer during rebalancing

It was noticed in production that occasionally messages can be skipped during consumption. Kafka-Pixy just never returns them jumping to newer offsets. It is never a lot, but always one or two messages. It seems like message loses happen during rebalancing, when a new Kafka-Pixy joins a consumer group or an old one leaves it.

Consumption from a topic stopped for a group

Consume requests for a group started to return long polling timeout errors even though other consumer groups kept consuming messages from the same topic. Consumption resumed after Kafka-Pixy had been restarted. At the time of the incident the following records appeared in the log:

10/16/17, 16:51:37.686 Disposed of child: key=use1_user_events, left=3
10/16/17, 16:51:37.682 Topic subscription expired
10/16/17, 16:51:37.686 Topic subscription expired
10/16/17, 16:51:37.686 Disposed of child: key=use1_pipeline_events, left=2
10/16/17, 16:51:37.696 Submitted: topics=[address_verify use1_pipeline_events use1_scout_recount]
10/16/17, 16:51:37.709 Submitted: topics=[address_verify use1_scout_recount]
10/16/17, 16:51:37.906 Spawning child: key=use1_pipeline_events
10/16/17, 16:51:37.907 Spawning child: key=use1_user_events
10/16/17, 16:51:37.920 Submitted: topics=[address_verify use1_pipeline_events use1_scout_recount]
10/16/17, 16:51:38.136 Submitted: topics=[address_verify use1_pipeline_events use1_scout_recount use1_user_events]
10/16/17, 16:51:38.391 Redundant group update ignored: map[pixy_worker-n06_2017-09-08T13.59.49Z_128791:[address_verify use1_pipeline_events use1_scout_recount use1_user_events] pixy_worker-n04_2017-09-29T08.39.54Z_125653:[address_verify use1_pipeline_events use1_scout_recount use1_user_events] pixy_worker-n05_2017-09-08T13.59.48Z_40662:[address_verify use1_pipeline_events use1_scout_recount use1_user_events] pixy_worker-n02_2017-09-08T13.59.26Z_123919:[address_verify use1_pipeline_events use1_scout_recount use1_user_events] pixy_worker-n01_2017-09-29T08.37.37Z_69544:[address_verify use1_pipeline_events use1_scout_recount use1_user_events] pixy_worker-n03_2017-09-08T14.18.32Z_93190:[address_verify use1_pipeline_events use1_scout_recount use1_user_events] pixy_worker-n07_2017-09-08T14.00.12Z_99606:[address_verify use1_pipeline_events use1_scout_recount use1_user_events]]
10/16/17, 16:51:38.391 Redundant group update ignored: map[pixy_worker-n06_2017-09-08T13.59.49Z_128791:[address_verify use1_pipeline_events use1_scout_recount use1_user_events] pixy_worker-n04_2017-09-29T08.39.54Z_125653:[address_verify use1_pipeline_events use1_scout_recount use1_user_events] pixy_worker-n05_2017-09-08T13.59.48Z_40662:[address_verify use1_pipeline_events use1_scout_recount use1_user_events] pixy_worker-n02_2017-09-08T13.59.26Z_123919:[address_verify use1_pipeline_events use1_scout_recount use1_user_events] pixy_worker-n01_2017-09-29T08.37.37Z_69544:[address_verify use1_pipeline_events use1_scout_recount use1_user_events] pixy_worker-n03_2017-09-08T14.18.32Z_93190:[address_verify use1_pipeline_events use1_scout_recount use1_user_events] pixy_worker-n07_2017-09-08T14.00.12Z_99606:[address_verify use1_pipeline_events use1_scout_recount use1_user_events]]
10/16/17, 16:51:38.391 Fetched subscriptions: map[pixy_worker-n05_2017-09-08T13.59.48Z_40662:[address_verify use1_pipeline_events use1_scout_recount use1_user_events] pixy_worker-n02_2017-09-08T13.59.26Z_123919:[address_verify use1_pipeline_events use1_scout_recount use1_user_events] pixy_worker-n01_2017-09-29T08.37.37Z_69544:[address_verify use1_pipeline_events use1_scout_recount use1_user_events] pixy_worker-n03_2017-09-08T14.18.32Z_93190:[address_verify use1_pipeline_events use1_scout_recount use1_user_events] pixy_worker-n07_2017-09-08T14.00.12Z_99606:[address_verify use1_pipeline_events use1_scout_recount use1_user_events] pixy_worker-n06_2017-09-08T13.59.49Z_128791:[address_verify use1_pipeline_events use1_scout_recount use1_user_events] pixy_worker-n04_2017-09-29T08.39.54Z_125653:[address_verify use1_pipeline_events use1_scout_recount use1_user_events]]

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.