Giter Club home page Giter Club logo

kafka-rest-node's Introduction

kafka-rest

kafka-rest is a node.js library for the Kafka REST Proxy. It provides a thin wrapper around the REST API, providing a more convenient interface for accessing cluster metadata and producing and consuming Avro and binary data.

NOTE: This library was written to demonstrate how to create a language-specific wrapper around the REST Proxy. It is no longer maintained or kept up to date (e.g. the REST Proxy has a v2 API that is now recommended, but support has not been added to this wrapper library). Since this library was created, better JavaScript clients have been developed that do not require the REST Proxy. In particular, https://github.com/Blizzard/node-rdkafka, which wraps the librdkafka C client is a good choice and actively maintained.

Getting Started

You can install the kafka-rest library via npm:

npm install kafka-rest

The only required dependency is async. However, some additional libraries are required to run the examples and are included as devDependencies.

Next, make sure you have Kafka, the Schema Registry (if using Avro), and the Kafka REST Proxy running. The Confluent Quickstart and REST Proxy documentation explains how to do this step-by-step.

API

Start by requiring the library and creating a KafkaRest object to interact with the server:

var KafkaRest = require('kafka-rest');
var kafka = new KafkaRest({ 'url': 'http://localhost:8082' });

Metadata

The API mirrors the REST API closely: there are resources for Brokers, Topics, Partitions, and Consumers, and these are available in the KafkaRest object. For example, to get a list of brokers (we'll skip error checking to keep things simple):

// kafka.brokers is a Brokers instance, list() returns a list of Broker instances
kafka.brokers.list(function(err, brokers) {
    for(var i = 0; i < brokers.length; i++)
        console.log(brokers[i].toString());
});

Objects generated from API responses will have a field raw where you can get at the raw response data. For the brokers, brokers[i].raw would just be a number because the broker list API only returns broker IDs.

All types have both collection (Topics) and singular (Topic) types corresponding to the /topics and /topics/<topic> API endpoints. The collection types primary method is list() which returns a list of singular types which are initialize only with the data returned by the API call to get the list of objects. Singular objects can be constructed directly, or there is a shorthand via the collection type:

kafka.topics.get('test', function(err, topic) {
    // topic is a Topic object
});

And you can create an incomplete singular object (i.e. no API call is made, only its identifying properties are stored):

kafka.topics.topic('test');

This can be useful when you want to get directly at a nested resource you know the path to and don't want to do API calls on the intermediate resources (i.e. get a single partition, but don't look up topic metadata):

// The complete version makes the resource hierarchy clear
kafka.topics.topic('test').partitions.partition(0);
// Or use the shorter version. Under the hood they do the same thing, so you
// can use whichever is clearer based on context
kafka.topic('test').partition(0);
// For partitions there's also:
kafka.topicPartition('test', 0)

If you have an incomplete singular object returned by a list() call, you can fill request its data using get().

Producing

You can produce messages by calling produce() on Topic or Partition objects. They can be incomplete instances, e.g.:

kafka.topic('test').produce('message')

With this one method you can:

  • Specify values by passing them in directly, or records with any combination of key, value, and partition fields. (Only Topic objects support the partition field).
  • Send a batch of messages by passing them as separate arguments or as an array. Messages may be of mixed form, i.e. some may be raw values, others may be records with key and value fields.
  • Send raw binary data (null, string, or Buffer objects) or Avro data (in JSON form) with a schema. For Avro data, you can pass up to two AvroSchema objects. If one is included, it is used as the value schema; if two are include the first is the key schema, the second is the value schema.
  • Add a callback of the form function(err, res) anywhere in the argument list to be notified when the request completes. res is the JSON response returned by the proxy.

Here are a few examples showing these features:

// With binary data:

// Single message with a string value
topic.produce('msg1');

// Single message with key, value, and partition, with callback
topic.produce({'key': 'key1', 'value': 'msg1', 'partition': 0}, function(err,res){});

// Any record fields can be omitted
topic.produce({'partition': 0, 'value': 'msg1'});

// Multiple messages containing only values
topic.produce('msg1', 'msg2', 'msg3');

// Multiple messages containing only values, passed as array
topic.produce(['msg1', 'msg2', 'msg3']);

// Multiple messages, mixed format
topic.produce('msg1', {'partition': 0, 'value': 'msg2'});


// With Avro data:

var userIdSchema = new KafkaRest.AvroSchema("int");
var userInfoSchema = new KafkaRest.AvroSchema({
    "name": "UserInfo",
    "type": "record",
    "fields": [
        { "name": "id", "type": "int" },
        { "name": "name", "type": "string" }]
});

// Avro value schema followed by messages containing only values
topic.produce(userInfoSchema, {'avro': 'record'}, {'avro': 'another record'});

// Avro key and value schema.
topic.produce(userIdSchema, userInfoSchema, {'key': 1, 'value': {'id': 1, 'name': 'Bob'}});

To avoid sending schemas with every request, the REST API supports schema IDs. Use of schema IDs is handled transparently for you -- as long as you use the same AvroSchema object across calls to produce(), the IDs will be used instead of the full schemas.

Note that because this API is a thin wrapper around the REST Proxy, you must batch your messages to improve performance. The twitter/stream_tweets.js example performs this type of batching.

Consumers

Currently the REST proxy supports the high-level consumer interface using consumer groups. To start consuming data, join a consumer group, optionally specifying some configuration options (passed directly to the API call):

kafka.consumer("my-consumer-group").join({
    "format": "avro",
    "auto.commit.enable": "true",
}, function(err, consumer_instance) {
    // consumer_instance is a ConsumerInstance object
});

The group doesn't have to exist yet -- if you use a new consumer group name, it will be created. You can then subscribe to a topic, resulting in a ConsumerStream, and setup event handlers:

var stream = consumer_instance.subscribe('my-topic')
stream.on('data', function(msgs) {
    for(var i = 0; i < msgs.length; i++)
        console.log("Got a message: key=" + msgs[i].key + " value=" + msgs[i].value + " partition=" + msgs[i].partition);
});
stream.on('error', function(err) {
    console.log("Something broke: " + err);
});

The exact type for each messages key/value depends on the data format you're reading. Binary data will have been decoded from its base64 representation into a Buffer (or null). For Avro, you'll get an object.

Finally, when you're ready to clean up, request a graceful shutdown of the consumer instance, which also cleans up the stream:

consumer_instance.shutdown(function() {
    console.log("Shutdown complete.");
});

Examples

A few examples are included in the examples/ directory:

  • metadata.js - A simple demo of some of the metadata APIs, covering brokers, topics, and partitions.
  • console_producer.js - Reads from stdin and produces each line as a message to a Kafka topic.
  • console_consumer.js - Consumes a Kafka topic and writes each message to stdout.
  • twitter/stream_tweets.js - Uses Twitter's API to get a realtime feed of tweets which it produces to a Kafka topic.
  • twitter/trending.js - Uses the tweet data produced by the previous example to generate a list of trending hashtags, which it prints every 10 seconds to stdout.

Contribute

License

The project is licensed under the Apache 2 license.

kafka-rest-node's People

Contributors

atridgedcosta avatar confluenttools avatar david-shepard avatar dennybritz avatar dminkovsky avatar ewencp avatar george24601 avatar kjvalencik avatar nnam avatar toaderflorin avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka-rest-node's Issues

Running in Cluster Mode for Node

While running the kafka-rest-node in cluster mode how do we ensure that topic consumption is load balanced between each worker.

My rest kafka consumer problem

var KafkaRest = require('kafka-rest');
var kafka = new KafkaRest({ 'url': 'http://192.168.1.98:8082' });
kafkaconsumer();
function kafkaconsumer(){
    kafka.consumer("my-consumer-group").join({
        "format": "avro",
        "auto.commit.enable": "true",
    }, function(err, consumer_instance) {
        var stream = consumer_instance.subscribe('dnbtopic')
stream.on('data', function(msgs) {
    for(var i = 0; i < msgs.length; i++)
        console.log("Got a message: key=" + msgs[i].key + " value=" + msgs[i].value + " partition=" + msgs[i].partition);
});
stream.on('error', function(err) {
    console.log("Something broke: " + err);
});
    });

}

Response :

PS C:\Users\Alper-DNB'\kafka-rest-node-master> node consumer
Something broke: APIError: HTTP 500 Kafka error: null

CORS

Getting cors issue when using in reactJS

Producer (with avro) giving error after 1000 messages

When using the kafka-rest node module with avro and schema registry, the messages starts giving "500 internal server" error after producing 1000 messages (1000 separate produce requests). Based on my understanding this issue happens when we don't reuse the schema object. The kafka-rest documentation says that this is handled transparently for us. I don't see that happening. Am I missing something?

I don't see this issue when I use schema id with kafka rest proxy api directly.

Json schema, how to support?

With the current approach I can not send and receive just JSON data through rest-proxy. It is only Avro or binary.

'socket.timeout.ms' is not working with that driver

Hi.
I tried to consume from confluent 2.0 package and got error.
consume looked like:

var kafkaRest = require('kafka-rest');

var kafka = new kafkaRest({ url: restProxyServerUrl });

kafka.consumer(myConsumerGroup).join(
  { 'format': `avro`, 'auto.offset.reset': `smallest`, 'socket.timeout.ms': `20000` }
  , function(error, consumer) {
    console.log(error);
  });

error is :

 [APIError: Unrecognized field: socket.timeout.ms]
auditclient_1 |   name: 'APIError',
auditclient_1 |   status: 422,
auditclient_1 |   message: 'Unrecognized field: socket.timeout.ms',
auditclient_1 |   data:
auditclient_1 |    { error_code: 422,
auditclient_1 |      message: 'Unrecognized field: socket.timeout.ms' } }

although that on kafka documentation there's that option:

socket.timeout.ms   | 30 * 1000 | The socket timeout for network requests. The actual timeout set will be max.fetch.wait + socket.timeout.ms.

Ability to commit offset of consumer

Hi.
There is an option for the Kafka Rest Proxy to commit offsets as you can see here.
I want this options to be available for a consumer i created.
something like :

var kafkaRest = require('kafka-rest');

var kafka = new kafkaRest({ url: restProxyServerUrl });

kafka.consumer(myConsumerGroup).join(
  { 'format': `avro`, 'auto.offset.reset': `smallest`, `socket.timeout.ms`: `20000` }
  , function(error, consumer) {
    var listener = consumer.subscribe(topic);

    listener.on(`data`, function(messages) {
      consumer.commitOffset();
    });
  });

notice the consumer.commitOffset(); - it is like making a HTTP request to this with the consumerGroup and raw name from the instance.

ConsumerStream delay between requests

It seems like the ConsumerStream is currently polling in a tight loop. This may result in the API server being overwhelmed with requests. If a topic receives messages only sporadically this can be inefficient. Depending on the topic the user may also be willing to accept a certain delay in exchange for better performance/batching.

Would it make sense to expose an option to introduce delay between successive topic GET requests?

Is this repository still being maintained?

Hello, I'm very excited when a big guy like Confluentinc implement Kafka-rest client library for node. But it seems not working properly because every request to server make Kafka throws InvalidReceiveException. Therefore I wonder whether this repository is still being maintained or not?

BTW, here is the stacktrace:

[2017-09-27 12:23:50,115] WARN Unexpected error from /172.18.0.6; closing connection (org.apache.kafka.common.network.Selector)
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 1195725856 larger than 104857600)
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:95)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:75)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:203)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:167)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:381)
at org.apache.kafka.common.network.Selector.poll(Selector.java:326)
at kafka.network.Processor.poll(SocketServer.scala:500)
at kafka.network.Processor.run(SocketServer.scala:435)
at java.lang.Thread.run(Thread.java:745)

when I'm trying to call kafka.brokers.list().

Docker containers I'm using:

8414f70f40f8        confluentinc/cp-kafka-rest:3.3.0        "/etc/confluent/do..."   22 minutes ago      Up 22 minutes       0.0.0.0:8082->8082/tcp                       tos_kafka-rest_1
3beaa2c41c1e        confluentinc/cp-schema-registry:3.3.0   "/etc/confluent/do..."   22 minutes ago      Up 22 minutes       0.0.0.0:8081->8081/tcp                       tos_schema-registry_1
447f26233264        confluentinc/cp-kafka:3.3.0             "/etc/confluent/do..."   22 minutes ago      Up 22 minutes       0.0.0.0:9092->9092/tcp                       tos_kafka_1
9ca96e4ca75c        confluentinc/cp-zookeeper:3.3.0         "/etc/confluent/do..."   22 minutes ago      Up 22 minutes       2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp   tos_zookeeper_1

Thank you for reading this.

Production Ready?

I see a beta version in your package.json (i.e. pre 1.0.0) and a very small set of tests. The basic scenario I am setting up while following the instructions in the README is failing. So, is this node module production ready?

Avro data examples throw APIError: HTTP 422 Invalid schema

When running either of the two avro examples in the README.md I get the following error

APIError: HTTP 422 Invalid schema: org.codehaus.jackson.JsonParseException: Unexpected character ('i' (code 105)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')
at [Source: java.io.StringReader@637ee04; line: 1, column: 2]

Create Topic API

This is a question more about kafka-rest-proxy, than this node module itself.

I don't think there is a create topics API right now. Is there any plans to change that? What suggestions to you have for programmatically creating a topic from node if there isn't a create topic API? Thanks for your help!

Cory Parrish

Avro data examples throw APIError: HTTP 422 Invalid schema

When running either of the two avro examples in the README.md I get the following error

APIError: HTTP 422 Invalid schema: org.codehaus.jackson.JsonParseException: Unexpected character ('i' (code 105)): expected a valid value (number, String, array, object, 'true', 'false' or 'null')
at [Source: java.io.StringReader@637ee04b; line: 1, column: 2]

In looking at the code it appears as if the avro schema is being stringified twice before being sent to the REST Proxy.

ConsumerStream.prototype._read fires when this.active === false

I have implemented a Consumer from https://github.com/confluentinc/kafka-rest-node/blob/master/examples/console_consumer.js

Occasionally I am getting a 500 error when shutting down.

20:37:50.226Z  INFO: Attempting to shut down consumer instance...
20:37:50.333Z  INFO: Consumer stream closed.
20:37:50.350Z  INFO: Consumer instance closed.
20:37:50.351Z  INFO: Consumer has shut down.
20:37:50.353Z ERROR: Consumer instance reported an error: { error_code: 500, message: 'Internal Server Error' }
20:37:50.353Z  INFO: Attempting to shut down consumer instance...

Logs from kafka-rest also show a DELETE happens, then a GET happens.

[2017-06-01 20:46:39,625] INFO 127.0.0.1 - - [01/Jun/2017:20:46:39 +0000] "DELETE /consumers/mytopic-consumer-1496349998/instances/rest-consumer-4820caa6-d67d-4588-95ff-072c251d59a3 HTTP/1.1" 204 0  14 (io.confluent.rest-utils.requests:77)
[2017-06-01 20:46:39,628] INFO 127.0.0.1 - - [01/Jun/2017:20:46:39 +0000] "GET /consumers/mytopic-consumer-1496349998/instances/rest-consumer-4820caa6-d67d-4588-95ff-072c251d59a3/topics/mytopic HTTP/1.1" 500
 52  17 (io.confluent.rest-utils.requests:77)

During debugging of ConsumerStream, I noticed _read will fire after a shutdown. this.active is set to false, but the client request is still sent which results in a 500 error.

[Error: getaddrinfo EMFILE]

I am using confluent kafka platform . I have a topic with 4 partition and replication factor of 2. Single zookeeper, three brokers and kafka-rest proxy server. Now I am load testing the system with siege running 1000 users with a list of api which in turn hit kafka producer. I have my producer and consumer using the rest proxy (kafka-rest). I am getting following issue:
{ [Error: getaddrinfo EMFILE] code: 'EMFILE', errno: 'EMFILE', syscall: 'getaddrinfo' }
In kafka-rest log I can see:
[2016-02-23 07:13:51,972] INFO 127.0.0.1 - - [23/Feb/2016:07:13:51 +0000] "POST /topics/endsession HTTP/1.1" 200 120 14 (io.confluent.rest-utils.requests:77) [2016-02-23 07:13:51,973] INFO 127.0.0.1 - - [23/Feb/2016:07:13:51 +0000] "POST /topics/endsession HTTP/1.1" 200 120 15 (io.confluent.rest-utils.requests:77) [2016-02-23 07:13:51,974] INFO 127.0.0.1 - - [23/Feb/2016:07:13:51 +0000] "POST /topics/endsession HTTP/1.1" 200 120 12 (io.confluent.rest-utils.requests:77) [2016-02-23 07:13:51,978] INFO 127.0.0.1 - - [23/Feb/2016:07:13:51 +0000] "POST /topics/endsession HTTP/1.1" 200 120 6 (io.confluent.rest-utils.requests:77) [2016-02-23 07:13:51,983] INFO 127.0.0.1 - - [23/Feb/2016:07:13:51 +0000] "POST /topics/endsession HTTP/1.1" 200 120 6 (io.confluent.rest-utils.requests:77) [2016-02-23 07:13:51,984] INFO 127.0.0.1 - - [23/Feb/2016:07:13:51 +0000] "POST /topics/endsession HTTP/1.1" 200 120 4 (io.confluent.rest-utils.requests:77) [2016-02-23 07:13:51,985] INFO 127.0.0.1 - - [23/Feb/2016:07:13:51 +0000] "POST /topics/endsession HTTP/1.1" 200 120 7 (io.confluent.rest-utils.requests:77) [2016-02-23 07:13:51,993] INFO 127.0.0.1 - - [23/Feb/2016:07:13:51 +0000] "POST /topics/endsession HTTP/1.1" 200 120 3 (io.confluent.rest-utils.requests:77) [2016-02-23 07:13:51,994] INFO 127.0.0.1 - - [23/Feb/2016:07:13:51 +0000] "POST /topics/endsession HTTP/1.1" 200 120 4 (io.confluent.rest-utils.requests:77) [2016-02-23 07:13:51,999] WARN Accept failed for channel java.nio.channels.SocketChannel[closed] (org.eclipse.jetty.io.SelectorManager:714) java.io.IOException: Too many open files at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method) at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422) at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250) at org.eclipse.jetty.io.SelectorManager$ManagedSelector.processAccept(SelectorManager.java:706) at org.eclipse.jetty.io.SelectorManager$ManagedSelector.processKey(SelectorManager.java:648) at org.eclipse.jetty.io.SelectorManager$ManagedSelector.select(SelectorManager.java:611) at org.eclipse.jetty.io.SelectorManager$ManagedSelector.run(SelectorManager.java:549) at org.eclipse.jetty.util.thread.NonBlockingThread.run(NonBlockingThread.java:52) at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:635) at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:555) at java.lang.Thread.run(Thread.java:745)
So I went through a lot of questions related to this. Set my ec2 machine paramenters so that I dont get too many open file error. But its not solved. I have reduced the TIME_WAIT to 30 seconds. ulimit -n is 80000.

I have collected some stats and look like kafka rest proxy which is running on `localhost:8082 causing too many connections. How do I solve this issue? Also sometimes when error is coming and then I stop my siege test but again when TIME_WAIT connections are reduced, I restart my load test with 1 user only still I see the same issue. Some issue in rest proxy wrapper for node js?

Alos sometimes I can see EMFILE error on producer but no error in kafka-rest server log.

Issue with union types when sending Avro data

I am sending avro data with a schema that has a union type defined as one of my fields. As below:
{ "name" : "applicationId", "type": ["null", "string"], "default": null }

When I send a json that contains an actual value for this field or whether it is missing from the json totally, which would be the null case, I am getting the error below :

APIError: HTTP 422 Conversion of JSON to Avro failed: Failed to convert JSON to Avro: Expected start-union. Got VALUE_STRING

Is this not a valid schema definition and valid use case ?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.