Giter Club home page Giter Club logo

kafka-unit's Introduction

Kafka Unit Testing

TravisCI

Allows you to start and stop a Kafka broker + ZooKeeper instance for unit testing applications that communicate with Kafka.

Versions

kafka-unit Kafka broker Zookeeper
1.0 kafka_2.11:0.11.0.0 3.4.10
0.7 kafka_2.11:0.10.0.2 3.4.10
0.6 kafka_2.11:0.10.0.0 3.4.6
0.5 kafka_2.11:0.9.0.1 3.4.6
0.4 kafka_2.11:0.9.0.1 3.4.6
0.3 kafka_2.11:0.8.2.2 3.4.6
0.2 kafka_2.11:0.8.2.1 3.4.6

Maven central

<dependency>
    <groupId>info.batey.kafka</groupId>
    <artifactId>kafka-unit</artifactId>
    <version>1.0</version>
</dependency>

Starting manually

To start both a Kafka server and ZooKeeper instance on random ports use following code:

KafkaUnit kafkaUnitServer = new KafkaUnit();
kafkaUnitServer.startup();
kafkaUnitServer.shutdown();

ZooKeeper and Kafka broker ports can be specified explicitly using second constructor, which takes two ints:

KafkaUnit kafkaUnitServer = new KafkaUnit(5000, 5001);

The alternative constructor allows providing connection strings rather than ports, which might be convenient if you want to use existing config without parsing it to extract port numbers:

KafkaUnit kafkaUnitServer = new KafkaUnit("localhost:5000", "localhost:5001");

Currently only localhost is supported and it's required that the connection string consists of only one localhost:[port] pair.

You can then write your own code to interact with Kafka or use the following methods:

kafkaUnitServer.createTopic(testTopic);
ProducerRecord<String, String> keyedMessage = new ProducerRecord<>(testTopic, "key", "value");
kafkaUnitServer.sendMessages(keyedMessage);

And to read messages:

List<String> messages = kafkaUnitServer.readMessages(testTopic, 1);
List<String> allMessages = kafkaUnitServer.readAllMessages(testTopic);

Only String messages are supported at the moment.

Alternatively, you can use getKafkaConnect() to manually configure producer and consumer clients like:

Properties props = new Properties();
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getCanonicalName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getCanonicalName());
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUnitServer.getKafkaConnect());

Producer<Long, String> producer = new KafkaProducer<>(props);

Using the JUnit Rule

If you don't want to start/stop the server manually, you can use the JUnit rule, e.g.

public class KafkaUnitIntegrationTest {

    @Rule
    public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule();

    @Test
    public void junitRuleShouldHaveStartedKafka() throws Exception {
        String testTopic = "TestTopic";
        kafkaUnitRule.getKafkaUnit().createTopic(testTopic);
        ProducerRecord<String, String> keyedMessage = new ProducerRecord<>(testTopic, "key", "value");

        kafkaUnitServer.sendMessages(keyedMessage);
        List<String> messages = kafkaUnitRule.getKafkaUnit().readMessages(testTopic, 1);

        assertEquals(Arrays.asList("value"), messages);
    }
}

This will start/stop the broker every test, so that particular test can't interfere with the next. Contrary to KafkaUnit() constructor, it does not throw checked IOException when socket initialization fails, but wraps it in runtime exception and thus is suitable for use as @Rule field in tests.

If you want to start server on specific ports, use KafkaUnitRule(int, int) or KafkaUnitRule(String, String) constructor, which accepts ZooKeeper and Kafka broker ports or connection strings respectively (just like corresponding KafkaUnit constructors), e.g.:

    @Rule
    public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule(5000, 5001);

License

Copyright 2013 Christopher Batey

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

kafka-unit's People

Contributors

chbatey avatar choang avatar craigwilliams84 avatar cwensel avatar edwardmlyte avatar felipefzdz avatar franekrichardson avatar jvrmaia avatar qnzvna avatar reda-alaoui avatar regispl avatar sakanaou avatar soid avatar xaerxess avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka-unit's Issues

why limiting to localhost only?

Thanks for this useful project. I need to run an integration test from a kafka consumer which is deployed to a running docker container with arquillian. I have issues accessing the kafka-unit server which lives itself outside of the docker container (in localhost). The only way to get this work is to start the kafka server on the docker bridge interface as well as the consumer (in manual testing).

BadVersionException thrown on createTopic

Hello:

I am exec'ing:

    kafkaUnitServer = new KafkaUnit(zooPort, brokerPort) # some ints
    kafkaUnitServer.createTopic(testTopic) # some String

but it yields:

java.lang.NoClassDefFoundError: org/apache/zookeeper/KeeperException$BadVersionException

when this lib calls:

        ZkUtils zkUtils = ZkUtils.apply(opts.options().valueOf(opts.zkConnectOpt()),
                30000, 30000, JaasUtils.isZkSecurityEnabled());

any tips? thx!

Can not connect to Zookeeper when create topic

Hi
I receive an exception when I create a topic:

org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server within timeout: 30000
at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:1232)
at org.I0Itec.zkclient.ZkClient.(ZkClient.java:156)
at org.I0Itec.zkclient.ZkClient.(ZkClient.java:130)
at kafka.utils.ZkUtils$.createZkClientAndConnection(ZkUtils.scala:75)
at kafka.utils.ZkUtils$.apply(ZkUtils.scala:57)
at kafka.utils.ZkUtils.apply(ZkUtils.scala)
at info.batey.kafka.unit.KafkaUnit.createTopic(KafkaUnit.java:170)
at info.batey.kafka.unit.KafkaUnit.createTopic(KafkaUnit.java:153)

and my code is like this:
`
import com.google.common.base.Throwables;
import info.batey.kafka.unit.KafkaUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class TestClass {

private static final int KAFKA_PORT = 2181;
private static final int ZK_PORT = 9092;

private static final String SOURCE_TOPIC = "source-topic";
private static final String ISSUE_TOPIC = "issue-topic";
private static final String CRASH_TOPIC = "crash-topic";

private KafkaUnit kafkaServer;

@Before
public void setup() {

    try {
        kafkaServer = new KafkaUnit(ZK_PORT, KAFKA_PORT);

        kafkaServer.createTopic(SOURCE_TOPIC);
        kafkaServer.createTopic(ISSUE_TOPIC);
        kafkaServer.createTopic(CRASH_TOPIC);
    } catch (Exception e) {
        Throwables.propagate(e);
    }
    kafkaServer.startup();
}

@Test
public void testConsumer() {
}

@After
public void tearDown() {
    if (null != kafkaServer)
        kafkaServer.shutdown();
}

}
`

I don't know how to fix this. Any suggestions are thankful!

Timed out waiting for messages

Please make the amount of time to wait for reading messages configurable.
KafkaUnit:

public List<String> readMessages(String topicName, final int expectedMessages) throws TimeoutException { ... return submit.get(3, TimeUnit.SECONDS);

Support for kafka connect

Hi,

We're currently finishing off adding support for kafka connect (we need it for a new connector we are developing) - would you welcome a PR to add this. We would rather work off that than a fork..

Cheers,

Franek.

Kafka 11.x.y version bump release?

Would be great to see a .8 release with bumped versions Kafka/ZK (and my recent patch) released.

If there are no plans, I can publish under my own maven group.

ckw

Code difference between maven artifact scalatest-embedded-kafka_2.11

using sbt
class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends AbstractConfig(KafkaConfig.configDef, props, doLog) {

def this(props: java.util.Map[_, _]) = this(props, true)

using maven
class KafkaConfig private(val props: VerifiableProperties) extends

This caused test to abort giving NoSuchMethodError

kafka client 0.9.0.1

Hi there.
I'm trying to get Kafka Unit integrated with a kafka consumer of the new flavor as described in pom:

<dependency>
    <groupId>org.apache.kafka</groupId>
       <artifactId>kafka-clients</artifactId>
            <version>0.9.0.1</version>
        </dependency>

I'm able to produce messages but the consumer is somehow not able to consume, although there is nothing in the logs that appear out of the ordinary. I'm able to use a off process kafka server for my unit tests at this time, but it's not ideal.

Here is some logs that indicate client connection success:

2016-05-20 11:05:54 DEBUG NetworkClient:619 - Sending metadata request ClientRequest(expectResponse=true, callback=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=17,client_id=consumer-1}, body={topics=[test-b-topic]}), isInitiatedByNetworkClient, createdTimeMs=1463756754256, sendTimeMs=0) to node 1
2016-05-20 11:05:54 INFO  GroupMetadataManager:68 - [Group Metadata Manager on Broker 1]: Finished loading offsets from [__consumer_offsets,45] in 1 milliseconds.
2016-05-20 11:05:54 INFO  GroupMetadataManager:68 - [Group Metadata Manager on Broker 1]: Loading offsets and group metadata from [__consumer_offsets,48]
2016-05-20 11:05:54 DEBUG Metadata:172 - Updated cluster metadata version 10 to Cluster(nodes = [Node(1, localhost, 6692)], partitions = [Partition(topic = test-b-topic, partition = 0, leader = 1, replicas = [1,], isr = [1,]])
2016-05-20 11:05:54 DEBUG logger:138 - Completed request:Name: TopicMetadataRequest; Version: 0; CorrelationId: 17; ClientId: consumer-1 from connection 127.0.0.1:6692-127.0.0.1:43094;totalTime:1,requestQueueTime:0,localTime:1,remoteTime:0,responseQueueTime:0,sendTime:0,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS
2016-05-20 11:05:54 DEBUG AbstractCoordinator:465 - Issuing group metadata request to broker 1
2016-05-20 11:05:54 INFO  GroupMetadataManager:68 - [Group Metadata Manager on Broker 1]: Finished loading offsets from [__consumer_offsets,48] in 1 milliseconds.
2016-05-20 11:05:54 DEBUG AbstractCoordinator:478 - Group metadata response ClientResponse(receivedTimeMs=1463756754261, disconnected=false, request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@76c76263, request=RequestSend(header={api_key=10,api_version=0,correlation_id=18,client_id=consumer-1}, body={group_id=test-group-id}), createdTimeMs=1463756754258, sendTimeMs=1463756754258), responseBody={error_code=0,coordinator={node_id=1,host=localhost,port=6692}})
2016-05-20 11:05:54 DEBUG NetworkClient:487 - Initiating connection to node 2147483646 at localhost:6692.
2016-05-20 11:05:54 DEBUG logger:138 - Completed request:{api_key=10,api_version=0,correlation_id=18,client_id=consumer-1} -- {group_id=test-group-id} from connection 127.0.0.1:6692-127.0.0.1:43094;totalTime:3,requestQueueTime:0,localTime:3,remoteTime:0,responseQueueTime:0,sendTime:0,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS
2016-05-20 11:05:54 DEBUG ConsumerCoordinator:247 - Revoking previously assigned partitions []
2016-05-20 11:05:54 DEBUG Acceptor:52 - Accepted connection from /127.0.0.1 on /127.0.0.1:6692. sendBufferSize [actual|requested]: [102400|102400] recvBufferSize [actual|requested]: [102400|102400]
2016-05-20 11:05:54 DEBUG AbstractCoordinator:309 - (Re-)joining group test-group-id
2016-05-20 11:05:54 DEBUG Processor:52 - Processor 0 listening to new connection from /127.0.0.1:43096
2016-05-20 11:05:54 DEBUG AbstractCoordinator:318 - Issuing request (JOIN_GROUP: {group_id=test-group-id,session_timeout=30000,member_id=,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 lim=24 cap=24]}]}) to coordinator 2147483646
2016-05-20 11:05:54 DEBUG Metrics:201 - Added sensor with name node-2147483646.bytes-sent
2016-05-20 11:05:54 DEBUG Metrics:201 - Added sensor with name node-2147483646.bytes-received
2016-05-20 11:05:54 DEBUG Metrics:201 - Added sensor with name node-2147483646.latency
2016-05-20 11:05:54 DEBUG NetworkClient:467 - Completed connection to node 2147483646
2016-05-20 11:05:54 INFO  GroupCoordinator:68 - [GroupCoordinator 1]: Preparing to restabilize group test-group-id with old generation 0
2016-05-20 11:05:54 INFO  GroupCoordinator:68 - [GroupCoordinator 1]: Stabilized group test-group-id generation 1
2016-05-20 11:05:54 DEBUG AbstractCoordinator:336 - Joined group: {error_code=0,generation_id=1,group_protocol=range,leader_id=consumer-1-2ef8ed6a-4ab2-42a8-8c4b-455aaad2c137,member_id=consumer-1-2ef8ed6a-4ab2-42a8-8c4b-455aaad2c137,members=[{member_id=consumer-1-2ef8ed6a-4ab2-42a8-8c4b-455aaad2c137,member_metadata=java.nio.HeapByteBuffer[pos=0 lim=24 cap=24]}]}
2016-05-20 11:05:54 DEBUG logger:138 - Completed request:{api_key=11,api_version=0,correlation_id=19,client_id=consumer-1} -- {group_id=test-group-id,session_timeout=30000,member_id=,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 lim=24 cap=24]}]} from connection 127.0.0.1:6692-127.0.0.1:43096;totalTime:12,requestQueueTime:0,localTime:12,remoteTime:0,responseQueueTime:0,sendTime:0,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS
2016-05-20 11:05:54 DEBUG ConsumerCoordinator:225 - Performing range assignment for subscriptions {consumer-1-2ef8ed6a-4ab2-42a8-8c4b-455aaad2c137=org.apache.kafka.clients.consumer.internals.PartitionAssignor$Subscription@5d4a76ed}
2016-05-20 11:05:54 DEBUG ConsumerCoordinator:229 - Finished assignment: {consumer-1-2ef8ed6a-4ab2-42a8-8c4b-455aaad2c137=org.apache.kafka.clients.consumer.internals.PartitionAssignor$Assignment@5a354c6d}
2016-05-20 11:05:54 DEBUG AbstractCoordinator:397 - Issuing leader SyncGroup (SYNC_GROUP: {group_id=test-group-id,generation_id=1,member_id=consumer-1-2ef8ed6a-4ab2-42a8-8c4b-455aaad2c137,group_assignment=[{member_id=consumer-1-2ef8ed6a-4ab2-42a8-8c4b-455aaad2c137,member_assignment=java.nio.HeapByteBuffer[pos=0 lim=32 cap=32]}]}) to coordinator 2147483646
2016-05-20 11:05:54 INFO  GroupCoordinator:68 - [GroupCoordinator 1]: Assignment received from leader for group test-group-id for generation 1
2016-05-20 11:05:54 DEBUG Log:52 - Flushing log '__consumer_offsets-38 up to offset 1, last flushed: 1463756754150 current time: 1463756754292 unflushed = 1
2016-05-20 11:05:54 DEBUG ReplicaManager:52 - [Replica Manager on Broker 1]: Request key __consumer_offsets-38 unblocked 0 fetch requests.
2016-05-20 11:05:54 DEBUG Partition:52 - Partition [__consumer_offsets,38] on broker 1: High watermark for partition [__consumer_offsets,38] updated to 1 [0 : 260]
2016-05-20 11:05:54 DEBUG ReplicaManager:52 - [Replica Manager on Broker 1]: Request key __consumer_offsets-38 unblocked 0 fetch requests.
2016-05-20 11:05:54 DEBUG ReplicaManager:52 - [Replica Manager on Broker 1]: Request key __consumer_offsets-38 unblocked 0 producer requests.
2016-05-20 11:05:54 DEBUG ReplicaManager:52 - [Replica Manager on Broker 1]: Produce to local log in 17 ms
2016-05-20 11:05:54 DEBUG AbstractCoordinator:423 - Received successful sync group response for group test-group-id: {error_code=0,member_assignment=java.nio.HeapByteBuffer[pos=0 lim=32 cap=32]}
2016-05-20 11:05:54 DEBUG logger:138 - Completed request:{api_key=14,api_version=0,correlation_id=20,client_id=consumer-1} -- {group_id=test-group-id,generation_id=1,member_id=consumer-1-2ef8ed6a-4ab2-42a8-8c4b-455aaad2c137,group_assignment=[{member_id=consumer-1-2ef8ed6a-4ab2-42a8-8c4b-455aaad2c137,member_assignment=java.nio.HeapByteBuffer[pos=0 lim=32 cap=32]}]} from connection 127.0.0.1:6692-127.0.0.1:43096;totalTime:29,requestQueueTime:0,localTime:29,remoteTime:0,responseQueueTime:0,sendTime:0,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS
2016-05-20 11:05:54 DEBUG ConsumerCoordinator:191 - Setting newly assigned partitions [test-b-topic-0]
2016-05-20 11:05:54 DEBUG ConsumerCoordinator:581 - Fetching committed offsets for partitions: [test-b-topic-0]
2016-05-20 11:05:54 DEBUG logger:138 - Completed request:Name: OffsetFetchRequest; Version: 1; CorrelationId: 21; ClientId: consumer-1; GroupId: test-group-id from connection 127.0.0.1:6692-127.0.0.1:43096;totalTime:7,requestQueueTime:2,localTime:5,remoteTime:0,responseQueueTime:0,sendTime:0,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS
2016-05-20 11:05:54 DEBUG ConsumerCoordinator:628 - No committed offset for partition test-b-topic-0
2016-05-20 11:05:54 DEBUG Fetcher:290 - Resetting offset for partition test-b-topic-0 to latest offset.
2016-05-20 11:05:54 DEBUG logger:138 - Completed request:Name: OffsetRequest; Version: 0; CorrelationId: 22; ClientId: consumer-1; ReplicaId: -1 from connection 127.0.0.1:6692-127.0.0.1:43094;totalTime:9,requestQueueTime:1,localTime:8,remoteTime:0,responseQueueTime:0,sendTime:0,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS
2016-05-20 11:05:54 DEBUG Fetcher:483 - Fetched offset 0 for partition test-b-topic-0
2016-05-20 11:05:54 DEBUG Metrics:201 - Added sensor with name FetchThrottleTime-consumer-1
2016-05-20 11:05:54 DEBUG Metrics:201 - Added sensor with name Fetch-consumer-1
2016-05-20 11:05:54 DEBUG logger:138 - Completed request:Name: FetchRequest; Version: 1; CorrelationId: 23; ClientId: consumer-1; ReplicaId: -1; MaxWait: 500 ms; MinBytes: 1 bytes from connection 127.0.0.1:6692-127.0.0.1:43094;totalTime:514,requestQueueTime:2,localTime:4,remoteTime:501,responseQueueTime:0,sendTime:1,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS
2016-05-20 11:05:54 DEBUG Metrics:201 - Added sensor with name topic.test-b-topic.bytes-fetched
2016-05-20 11:05:54 DEBUG Metrics:201 - Added sensor with name topic.test-b-topic.records-fetched
2016-05-20 11:05:55 DEBUG logger:138 - Completed request:Name: FetchRequest; Version: 1; CorrelationId: 24; ClientId: consumer-1; ReplicaId: -1; MaxWait: 500 ms; MinBytes: 1 bytes from connection 127.0.0.1:6692-127.0.0.1:43094;totalTime:501,requestQueueTime:0,localTime:0,remoteTime:500,responseQueueTime:0,sendTime:1,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS
2016-05-20 11:05:55 DEBUG logger:138 - Completed request:Name: FetchRequest; Version: 1; CorrelationId: 25; ClientId: consumer-1; ReplicaId: -1; MaxWait: 500 ms; MinBytes: 1 bytes from connection 127.0.0.1:6692-127.0.0.1:43094;totalTime:501,requestQueueTime:0,localTime:0,remoteTime:500,responseQueueTime:0,sendTime:0,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS
2016-05-20 11:05:56 DEBUG FinalRequestProcessor:88 - Processing request:: sessionid:0x154ceb36b800000 type:ping cxid:0xfffffffffffffffe zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a
2016-05-20 11:05:56 DEBUG FinalRequestProcessor:160 - sessionid:0x154ceb36b800000 type:ping cxid:0xfffffffffffffffe zxid:0xfffffffffffffffe txntype:unknown reqpath:n/a
2016-05-20 11:05:56 DEBUG ClientCnxn:717 - Got ping response for sessionid: 0x154ceb36b800000 after 0ms

Failed to construct Kafka Producer

On a unit test that I am writing, I have the following code below to test out that my Kafka Consumer is obtaining stuff from any relevant Producer. My Kafka Consumer code is wrapped within the KafkaStreamObtainer class. With the code below,

package com.termmerge.nlpcore.obtainer;

import junit.framework.TestCase;
import org.junit.Test;
import org.junit.ClassRule;
import com.github.charithe.kafka.KafkaJunitRule;
import com.github.charithe.kafka.EphemeralKafkaBroker;
import net.jodah.concurrentunit.Waiter;

import java.util.Map;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;


public class KafkaStreamObtainerTest extends TestCase
{

  @ClassRule
  public KafkaJunitRule kafkaRule =
          new KafkaJunitRule(EphemeralKafkaBroker.create());

  @Test
  public void testOneMessage() throws Throwable
  {
    Waiter waiter = new Waiter();

    KafkaProducer testProducer = kafkaRule.helper().createStringProducer();
    testProducer.send(
            new ProducerRecord<>("testTopic", "testKey", "testValue")
    );
    testProducer.close();

    Map consumerSettings = new Properties();
    consumerSettings.put(
            "connection_string",
            "localhost:" + Integer.toString(kafkaRule.helper().kafkaPort())
    );
    consumerSettings.put("group_id", "test");
    KafkaStreamObtainer kafkaStream =
            new KafkaStreamObtainer(consumerSettings);
    kafkaStream.addListener((record) -> {
      waiter.assertEquals(record.get("key"), "testKey");
      waiter.assertEquals(record.get("value"), "testValue");
      waiter.resume();
    });
    kafkaStream.listenToStream("testTopic");

    waiter.await(50000);
  }

}

I keep getting the following error:

org.apache.kafka.common.KafkaException: Failed to construct kafka producer

I'm completely stumped. What could possibly cause this?

Commons-io is pulling 1.3.2

[INFO] --- maven-dependency-plugin:3.0.2:tree (default-cli) @ job ---
[INFO] Verbose not supported since maven-dependency-plugin 3.0
[INFO] job:jar:1.0-SNAPSHOT
[INFO] - info.batey.kafka:kafka-unit:jar:1.0:test
[INFO] - commons-io:commons-io:jar:1.3.2:compile
[INFO] --------------------------------------------------

This version of commons-io is causung unit test to fail.

Licensing terms

Would it be possible to add a license to this project. Apache 2.0 would be ideal. We would like to use it, but cannot proceed without clear licensing terms. Cheers.

A way to reset received events

I'm using this library to check for events published as messages during integration tests. The problem is however that in many tests I'm not interested in all of the events published but only some which are published from an specific point until now.

The read messages does not provide a way to set a position or skip events until an specific point. It would be great if it was possible to reset events or set the consumer to the end of messages at some point. As a result the next read message will get only the last x messages.

Currently to solve this problem I'm using a consumer in a separate thread which is receiving all the events into a list that can be cleared any time to get future events in an empty list.

Usage: Error while starting kafka unit server

I am trying to use the kafka-unit maven package to verify if I can use an embedded kafka server for testing. However, I am getting the following error:

2019-04-18 10:52:51,581 FATAL KafkaServer: Fatal error during KafkaServer startup. Prepare to shutdown
java.lang.NoClassDefFoundError: org/apache/kafka/common/utils/Base64
	at kafka.utils.CoreUtils$.generateUuidAsBase64(CoreUtils.scala:286)
	at kafka.server.KafkaServer$$anonfun$getOrGenerateClusterId$1.apply(KafkaServer.scala:356)
	at kafka.server.KafkaServer$$anonfun$getOrGenerateClusterId$1.apply(KafkaServer.scala:356)
	at scala.Option.getOrElse(Option.scala:121)
	at kafka.server.KafkaServer.getOrGenerateClusterId(KafkaServer.scala:356)
	at kafka.server.KafkaServer.startup(KafkaServer.scala:197)
	at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:38)
	at info.batey.kafka.unit.KafkaUnit.startup(KafkaUnit.java:149)
	at com.concentricai.dbaggregator.TestDBAggregatorInt.setUp(TestDBAggregatorInt.java:15)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
	at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
	at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
	at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
	at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.utils.Base64
	at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:583)
	at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
	... 32 more
2019-04-18 10:52:51,582 INFO  KafkaServer: shutting down
2019-04-18 10:52:51,583 DEBUG KafkaScheduler: Shutting down task scheduler.
2019-04-18 10:52:51,584 DEBUG ZkClient: Closing ZkClient...
2019-04-18 10:52:51,584 INFO  ZkEventThread: Terminate ZkClient event thread.
2019-04-18 10:52:51,584 DEBUG ZkConnection: Closing ZooKeeper connected to localhost:56651
2019-04-18 10:52:51,584 DEBUG ZooKeeper: Closing session: 0x16a3194694d0000

Here is my test code

import info.batey.kafka.unit.KafkaUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class TestDBAggregatorInt {

    KafkaUnit kafkaUnitServer;

    @Before
    public void setUp() throws Exception {
        kafkaUnitServer = new KafkaUnit();
        kafkaUnitServer.startup();
    }

    @After
    public void tearDown() {
        kafkaUnitServer.startup();
    }

    @Test
    public void testKafkaCreateTopic() {
        kafkaUnitServer.createTopic("test");
    }

}

And I have added the kafka-unit dependency in my pom.xml as follows:

        <dependency>
            <groupId>info.batey.kafka</groupId>
            <artifactId>kafka-unit</artifactId>
            <version>1.0</version>
            <scope>test</scope>
        </dependency>

Support Scala 2.10

Would like to use this in a project that requires scala 2.10. kafka-unit is using org.apache.kafka:kafka_2.11 I need org.apache.kafka:kafka_2.10 for my projects.

compile('org.apache.kafka:kafka_2.11:0.8.2.2')

This dependency conflict causes fatal errors during KafkaServer startup:

Feb 22 2016 09:12:41.572 (ScalaTest-run:kafka.utils.Logging$class:fatal:116) FATAL - [Kafka Server 1], Fatal error during KafkaServer startup. Prepare to shutdown
java.lang.NoSuchMethodError: scala.Predef$.$conforms()Lscala/Predef$$less$colon$less;

I don't know the best way to proceed and support multiple scala versions but I can try a pull request if given the right direction. I was in the middle of writing a similar framework and would prefer to commit here then duplicate this work.

Error Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic:

Hi, first thanks for the work to bring a way to test kafka easily
I'm using this package but I failed to make the simple test

package com.the_ica.cli
import info.batey.kafka.unit.{ KafkaUnit, KafkaUnitRule }
import kafka.producer.KeyedMessage


object KafkaTest {
  def main(args: Array[String]): Unit = {
    val kafkaUnitServer = new KafkaUnit(2181, 9092)
    val testTopic = "topic"
    kafkaUnitServer.startup()
    kafkaUnitServer.createTopic(testTopic)
    val keyedMessage = new KeyedMessage(testTopic, "key", "value")
    kafkaUnitServer.sendMessages(keyedMessage)
    val results = kafkaUnitServer.readMessages(testTopic, 1)
    println(results)
  }
}

The full error is

Created topic "topic".
ERROR - k.p.a.DefaultEventHandler - Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: topic
ERROR - k.n.Processor - Closing socket for /127.0.0.1 because of error

NoSuchMethod

Hi there.
I'm getting an unusual problem when I'm testing within my eclipse IDE.
java.lang.NoSuchMethodError: org.apache.kafka.common.protocol.SecurityProtocol.nonTestingValues()Ljava/util/Set;
at kafka.server.KafkaConfig$.(KafkaConfig.scala:443)
at kafka.server.KafkaConfig$.(KafkaConfig.scala)
at kafka.server.KafkaConfig.(KafkaConfig.scala:714)
at kafka.server.KafkaConfig.(KafkaConfig.scala:716)
at info.batey.kafka.unit.KafkaUnit.startup(KafkaUnit.java:125)
at info.batey.kafka.unit.KafkaUnitRule.before(KafkaUnitRule.java:36)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:46)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86)
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)

What makes this more unusual is that when I build/compile in my IDE and link kafka-unit project, then this problem goes away. Meaning I can't simply reference the dependency in my pom.

Is this library still active?

Is this library still active? There have been a few PRs and a few Kafka releases since the last commit. Are there plans to continue working on this?

By the number of forks this library has seen, it looks like it's something the community could pick up.

Issue with camel-kafka implementation

Hi,

I have a camel-kafka implementation which is working perfectly with standalone kafka but trying to use embedded for integration test cases. Able to start embedded server and publish the message but messages are not consuming from kafka.

Thanks,
Aniket

Cannot run one of your tests

I get the following error when trying to create a topic:

ERROR DefaultEventHandler: Failed to collate messages by topic, partition due to: Failed to fetch topic metadata for topic: TestTopic

Kafka unit - Exception

Recently Kafka unit has started crashing JVM

Test attempted to crash JVM with code 1 java.lang.AssertionError: Test attempted to crash JVM with code 1 at java.lang.SecurityManager.checkExit(SecurityManager.java:761) at java.lang.Runtime.exit(Runtime.java:107) at java.lang.System.exit(System.java:971) at org.apache.kafka.common.utils.Exit$2.execute(Exit.java:39) at org.apache.kafka.common.utils.Exit.exit(Exit.java:51) at kafka.utils.Exit$.exit(Exit.scala:28) at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:43) at info.batey.kafka.unit.KafkaUnit.startup(KafkaUnit.java:149)

I have following dependency in my project

info/batey/kafka:kafka-unit-1.0.jar org/apache/kafka:kafka-clients-1.0.0.jar

Any help in this regard will be greatly appreciated.
Thanks in advance!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.