A simple module that exposes some simple methods to work with Kafka topics using {@link https://scramjet.eu/ scramjet}.
Installation:
$ npm install --save scramjet-kafka
Here's a sample case of augmenting a topic using an API: calcalutation of all transactions to a single currency in your kafka topic
await (
require('scramjet-kafka')
.consume(
"zk-kafka01.local:2021", // Zookeeper connection string
"transactions" // Topic
)
.assign( // assign the calcalutation data
async (transaction) => {
if (transaction.currency === "EUR") return { eur_value: transaction.value };
const exchange_rate = await getCurrencyExchangeRate(transaction.currency, "EUR");
return {
eur_value: exchange_rate * transaction.value
}
}
)
.produce( // produce new topic with augmented data
"zk-kafka01.example.com:2021",
"transactionsInEUR"
)
);
Or an even simpler version:
await (
require('scramjet-kafka').augment(
"zk-kafka01.local:2021", // Zookeeper connection string
"transactions", // From Topic
(stream) => stream // Transform the stream however you like
.assign(async (transaction) => ({
eur_value: await getEURValue(transaction.value, transaction.currency)
})),
"transactionsInEUR" // To Topic
)
);
Extends DataStream
A scramjet.DataStream augmented with Kafka specific methods.
options
consumerOptions
Object Consumer options (optional, default{}
)streamOptions
Object Options passed to scramjet
Opens up connection to kafka and starts streaming.
Add topics to the stream
topics
Array<(String | Topic)> list of topics to listen onargs
Array additional arguments toConsumerStream::addTopics
Removes topics from the stream
Commits at the current position
Sets read offset at current position
topic
String topic name in kafkapartition
Number where to start readingoffset
Number kafka partition number
Scramjet StringStream plugin - the following methods are added to all scramjet streams
Plugin to scramjet::StringStream - push to kafka and pull on the other end.
This may be used to allow burst flow above memory limits.
client
(Client | KafkaClient) KafkaNode client to Zookeeper or direcly Kafkatopic
(String | Topic) topic - will be autogenerated if not given (optional, defaultnull
)
Returns Promise resolved when fromTopic stream ends.
Send the stream to the specified Kafka topic.
client
(Client | KafkaClient) KafkaNode.clienttopic
(String | Topic) topic - will be autogenerated if not given (optional, defaultnull
)
Returns Promise resolved when fromTopic stream ends.
Scramjet Kafka module exports
Type: Object
Fetches a stream from Kafka topic performs declared operations and publishes to the another topic.
The transforms as in scramjet can be asynchronous, synchronous, even multi-threaded.
{@see https://scramjet.eu/ Scramjet documentation}
client
(Client | KafkaClient) KafkaNode client to Zookeeper or direcly KafkafromTopic
(String | Topic) topic to consumeuse
UseCallback transforms callback or scramjet moduletoTopic
(String | Topic) topic to produce3
Returns Promise resolved when fromTopic stream ends.
Consume a topic from kafka and return a new KafkaStream
client
(Client | KafkaClient) KafkaNode client to Zookeeper or direcly Kafkatopics
Array<(String | Topic)> Topics to pull from Kafka (optional, default[]
)
topic
String topic name in kafkaoffset
Number where to start readingpartition
Number kafka partition number
MIT - see LICENSE