Giter Club home page Giter Club logo

kop's Introduction

Kafka-on-Pulsar (KoP)

Note:

KoP is now archived. It's recommended to try KSN (Kafka on StreamNative), see https://docs.streamnative.io/docs/kafka-on-cloud

KoP (Kafka on Pulsar) brings the native Apache Kafka protocol support to Apache Pulsar by introducing a Kafka protocol handler on Pulsar brokers. By adding the KoP protocol handler to your existing Pulsar cluster, you can migrate your existing Kafka applications and services to Pulsar without modifying the code. This enables Kafka applications to leverage Pulsar’s powerful features, such as:

  • Streamlined operations with enterprise-grade multi-tenancy
  • Simplified operations with a rebalance-free architecture
  • Infinite event stream retention with Apache BookKeeper and tiered storage
  • Serverless event processing with Pulsar Functions

KoP, implemented as a Pulsar protocol handler plugin with the protocol name "kafka", is loaded when Pulsar broker starts. It helps reduce the barriers for people adopting Pulsar to achieve their business success by providing a native Kafka protocol support on Apache Pulsar. By integrating the two popular event streaming ecosystems, KoP unlocks new use cases. You can leverage advantages from each ecosystem and build a truly unified event streaming platform with Apache Pulsar to accelerate the development of real-time applications and services.

KoP implements the Kafka wire protocol on Pulsar by leveraging the existing components (such as topic discovery, the distributed log library - ManagedLedger, cursors and so on) that Pulsar already has.

The following figure illustrates how the Kafka-on-Pulsar protocol handler is implemented within Pulsar.

Version compatibility

The version of KoP x.y.z.m conforms to Pulsar x.y.z, while m is the patch version number. KoP might also be compatible with older patched versions, but it's not guaranteed. See upgrade.md for details.

KoP is compatible with Kafka clients 0.9 or higher. For Kafka clients 3.2.0 or higher, you have to add the following configurations in KoP because of KIP-679.

kafkaTransactionCoordinatorEnabled=true
brokerDeduplicationEnabled=true

How to use KoP

You can configure and manage KoP based on your requirements. Check the following guides for more details.

Project Maintainers

License

This library is licensed under the terms of the Apache License 2.0 and may include packages written by third parties which carry their own copyright notices and license terms.

About StreamNative

Founded in 2019 by the original creators of Apache Pulsar, StreamNative is one of the leading contributors to the open-source Apache Pulsar project. We have helped engineering teams worldwide make the move to Pulsar with StreamNative Cloud, a fully managed service to help teams accelerate time-to-production.

kop's People

Contributors

aloyszhang avatar anonymitaet avatar bewaremypower avatar codelipenghui avatar demogorgon314 avatar dependabot[bot] avatar dockerzhang avatar eolivelli avatar gaoran10 avatar hangc0276 avatar jennifer88huang-zz avatar jiazhai avatar lhotari avatar lifepuzzlefun avatar lordcheng10 avatar michaeljmarshall avatar nicoloboschi avatar pierrez avatar sijie avatar snyk-bot avatar sundapeng1 avatar wangjialing218 avatar wenbingshen avatar wolfstudy avatar wuzhanpeng avatar yaalsn avatar yuanboliu avatar zhouxy0809 avatar ziyaowei avatar zymap avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kop's Issues

[FEATURE] Make SSL authorization work in Pulsar's way

This is related to SSL authentication that implemented in #45, After 45, user could enable authentication using SSL. But the authorization is not align with Pulsar. Most user would like to have both authentication and authorization worked.

In SSL auth, try to map CN to pulsar role, and leverage current pulsar authorise to do the authorization. then we could get pulsar-admin and pulsar io path authorised.

Currently, in KoP, all the auth with Pulsar side is through Pulsar admin client and Pulsar client (Both get from PulsarService directly). This way we could isolate KoP auth and Pulsar auth. So this task is not in a hurry.

[BUG] User report getOffset not return right value.

Currently, After we create a partitioned topic, for a topic that has no producer/consumer created on it,
running command
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 127.0.0.1:9092 --topic topic8 --time -1
will return
"Topic topic8 does not exist".

[BUG] NPE while enable BrokerPublishThrottling.

Describe the bug
while set

brokerPublisherThrottlingMaxByteRate

it will meet error:

08:38:34.452 [pulsar-broker-publish-rate-limiter-monitor-30-1] ERROR org.apache.bookkeeper.common.util.SafeRunnable - Unexpected throwable caught
java.lang.NullPointerException: null
        at org.apache.pulsar.broker.service.ServerCnx.enableCnxAutoRead(ServerCnx.java:1605) ~[org.apache.pulsar-pulsar-broker-2.5.0.jar:2.5.0]
        at org.apache.pulsar.broker.service.AbstractTopic.lambda$enableProducerRead$5(AbstractTopic.java:295) ~[org.apache.pulsar-pulsar-broker-2.5.0.jar:2.5.0]
        at java.util.concurrent.ConcurrentHashMap$ValuesView.forEach(ConcurrentHashMap.java:4707) ~[?:1.8.0_242]
        at org.apache.pulsar.broker.service.AbstractTopic.enableProducerRead(AbstractTopic.java:295) ~[org.apache.pulsar-pulsar-broker-2.5.0.jar:2.5.0]
        at org.apache.pulsar.broker.service.AbstractTopic.resetBrokerPublishCountAndEnableReadIfRequired(AbstractTopic.java:286) ~[org.apache.pulsar-pulsar-broker-2.5.0.jar:2.5.0]
        at org.apache.pulsar.broker.service.BrokerService.lambda$refreshBrokerPublishRate$39(BrokerService.java:1138) ~[org.apache.pulsar-pulsar-broker-2.5.0.jar:2.5.0]
        at org.apache.pulsar.broker.service.BrokerService.lambda$forEachTopic$40(BrokerService.java:1148) ~[org.apache.pulsar-pulsar-broker-2.5.0.jar:2.5.0]
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385) ~[org.apache.pulsar-pulsar-common-2.5.0.jar:2.5.0]
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159) ~[org.apache.pulsar-pulsar-common-2.5.0.jar:2.5.0]
        at org.apache.pulsar.broker.service.BrokerService.forEachTopic(BrokerService.java:1145) ~[org.apache.pulsar-pulsar-broker-2.5.0.jar:2.5.0]
        at org.apache.pulsar.broker.service.BrokerService.refreshBrokerPublishRate(BrokerService.java:1138) ~[org.apache.pulsar-pulsar-broker-2.5.0.jar:2.5.0]
        at org.apache.pulsar.broker.service.BrokerService.lambda$setupBrokerPublishRateLimiterMonitor$9(BrokerService.java:509) ~[org.apache.pu

This is caused by the mock Producer that we registered for bundle load/unload handling.

    /**
     * it sets cnx auto-readable if producer's cnx is disabled due to publish-throttling
     */
    protected void enableProducerRead() {
        if (producers != null) {
            producers.values().forEach(producer -> producer.getCnx().enableCnxAutoRead());
        }
    }


    public void enableCnxAutoRead() {
        // we can add check (&& pendingSendRequest < MaxPendingSendRequests) here but then it requires
        // pendingSendRequest to be volatile and it can be expensive while writing. also this will be called on if
        // throttling is enable on the topic. so, avoid pendingSendRequest check will be fine.
        if (!ctx.channel().config().isAutoRead() && autoReadDisabledRateLimiting) {
            // Resume reading from socket if pending-request is not reached to threshold
            ctx.channel().config().setAutoRead(true);
            // triggers channel read
            ctx.read();
            autoReadDisabledRateLimiting = false;
        }
    }

The code may meet error in "ctx.channel().config().isAutoRead()". we need to provide related fork. for InternalProducer and InternalServerCnx.

[FEATURE] Add support for APIVersions 3

Is your feature request related to a problem? Please describe.
This commit in librdkakfa is removing support of older versions of APIVersions to use only v3. As APIVersions is almost the first frame sent by clients, we need to support it to enable support of future librdkafka's version.

Describe the solution you'd like
We need to bump kafka-clients version to at least 2.4 as encoding and decoding is handled through this dep.

KafkaOnPulsarVersion support

This is similar to what pulsar provided class PulsarVersion, we could leverage this to get output of build and github info.

remove readme file in release after platform released.

currently, this repo is not open sourced, it need a place to find the readme. so in release file(.github/workflows/release.yml) we include the readme.

we could remove this once it open sourced, or anyother place contains it

[BUG] Provide cache mechanism for KafkaTopicManager and KafkaTopicConsumerManager

Describe the bug
KafkaTopicManager provided a way to hold on-going topics. it need a cache mechanism to evict invalid/old data.

we could use KafkaTopicManager as cache for MetadataCache(in KafkaApis.scala), to return

To Reproduce
Steps to reproduce the behavior:

  1. Go to '...'
  2. Click on '....'
  3. Scroll down to '....'
  4. See error

Expected behavior
A clear and concise description of what you expected to happen.

Screenshots
If applicable, add screenshots to help explain your problem.

Additional context
Add any other context about the problem here.

[BUG] Kafka 2.4.0 java producer will meet error of - produce exception Topic x not present in metadata

while 2.0.0 java api is OK.
example code:

props.setProperty("bootstrap.servers", "kf17ss.jx.shbt.qihoo.net:9092");
    props.setProperty("compression.type", "lz4");
    props.setProperty("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
    props.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
    props.setProperty("retries", "3");

  int logCount = 100;

    for (int i = 0; i < logCount; i++) {
      ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, bidLog.getBytes());
      // should always return true, if false, LogProducer refuse to service
      logProducer.send(record, new ProducerCallback(record));
    }

for (int i = 0; i < logCount; i++) {
      ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, bidLog.getBytes());
      // should always return true, if false, LogProducer refuse to service
      logProducer.send(record, new ProducerCallback(record));
    }   

[BUG]Force topic delete is not working when topic has active consumers/producers.

Describe the bug
Force topic delete is not working when topic has active consumers/producers.

To Reproduce
Steps to reproduce the behavior:

  1. Go inside bastion pod

  2. Create a partitioned topic using pulsar-admin client :
    image

  3. Create an active consumer in new terminal for the above topic created :
    image

  4. Force Delete the topic using this command while the consumer is actively running in other terminal:
    image

  5. Do a topics list to check if the topic is deleted and observe that topic is still listed :
    image

  6. Also try creating topics again and observe below error :
    image

Expected behavior
Topics should get deleted and topic creation after deletion should work.

This issue is easily reproducible please let me know if more info/logs is required.

[FEATURE] support different compress type

currently we did not do compress while produce and fetch messages. It would be good to add support for this feature.
The message handling are mostly in class "MessageRecordUtils".

Use binary protocol to find Broker to improve performance

Is your feature request related to a problem? Please describe.
In metadata request implementation, we use admin api to find broker for a topic.

String broker = kafkaService.getAdminClient().lookups().lookupTopic(topic.toString());

change to use binary protocol would be good to improve performance.

[BUG] poor performance with KOP

Describe the bug
When test KOP with kafka-producer-perf tool, performance is very poor:

$ bin/kafka-producer-perf-test.sh  --topic kop-test --producer.conf config/producer.properties  --throughput -1 --num-records 100000000 --record-size 1024
4036 records sent, 804.8 records/sec (0.79 MB/sec), 2349.0 ms avg latency, 4712.0 max latency.
3735 records sent, 745.2 records/sec (0.73 MB/sec), 7095.8 ms avg latency, 9678.0 max latency.
3600 records sent, 718.0 records/sec (0.70 MB/sec), 12010.2 ms avg latency, 14667.0 max latency.
3645 records sent, 726.7 records/sec (0.71 MB/sec), 17134.9 ms avg latency, 19659.0 max latency.

while test with pulsar-perf, performance much more higher

$ bin/pulsar-perf produce -threads 1 -u pulsar://100.76.43.216:6650 -o 10000 -n 4 -b 0 -bm 0 -s 4096 -r 3000000 public/default/kop-test
09:55:10.009 [main] INFO  org.apache.pulsar.testclient.PerformanceProducer - Throughput produced:  15882.3  msg/s ---    496.3 Mbit/s --- failure      0.0 msg/s --- Latency: mean: 337.086 ms - med: 236.380 - 95pct: 784.219 - 99pct: 861.987 - 99.9pct: 870.187 - 99.99pct: 870.439 - Max: 870.539
09:55:20.100 [main] INFO  org.apache.pulsar.testclient.PerformanceProducer - Throughput produced:  15703.9  msg/s ---    490.7 Mbit/s --- failure      0.0 msg/s --- Latency: mean: 1645.813 ms - med: 521.495 - 95pct: 10822.271 - 99pct: 10832.767 - 99.9pct: 10838.975 - 99.99pct: 10839.359 - Max: 10839.359
09:55:30.162 [main] INFO  org.apache.pulsar.testclient.PerformanceProducer - Throughput produced:  15777.3  msg/s ---    493.0 Mbit/s --- failure      0.0 msg/s --- Latency: mean: 1764.075 ms - med: 676.243 - 95pct: 10684.863 - 99pct: 10744.063 - 99.9pct: 10747.455 - 99.99pct: 10749.183 - Max: 10749.247

To Reproduce
just run kafka-producer-perf on a KOP supported pulsar cluster

Expected behavior
For kafka client, KOP should provide performance same as or close to native pulsar-client

Add unit test frame work

Is your feature request related to a problem? Please describe.
we should able to start a mock pulsar(kop) and do the unit test with it.

[BUG] an error in OffsetAcker

22:55:17.650 [ForkJoinPool.commonPool-worker-13] WARN  io.streamnative.pulsar.handlers.kop.coordinator.group.OffsetAcker - Error when get consumer for offset ack:
java.util.concurrent.CompletionException: org.apache.pulsar.client.api.PulsarClientException: Topic reader cannot be created on a partitioned topic
	at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326) ~[?:1.8.0_131]
	at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338) ~[?:1.8.0_131]
	at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911) ~[?:1.8.0_131]
	at java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899) ~[?:1.8.0_131]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_131]
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) ~[?:1.8.0_131]
	at org.apache.pulsar.client.impl.PulsarClientImpl.lambda$doCreateReaderAsync$14(PulsarClientImpl.java:483) ~[org.apache.pulsar-pulsar-client-original-2.5.0-ad0224407.jar:2.5.0-ad0224407]
	at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656) ~[?:1.8.0_131]
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:632) ~[?:1.8.0_131]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_131]
	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) ~[?:1.8.0_131]
	at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656) ~[?:1.8.0_131]
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:632) ~[?:1.8.0_131]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_131]
	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) ~[?:1.8.0_131]
	at org.apache.pulsar.client.impl.BinaryProtoLookupService.lambda$null$6(BinaryProtoLookupService.java:167) ~[org.apache.pulsar-pulsar-client-original-2.5.0-ad0224407.jar:2.5.0-ad0224407]
	at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656) ~[?:1.8.0_131]
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:632) ~[?:1.8.0_131]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_131]
	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) ~[?:1.8.0_131]
	at org.apache.pulsar.client.impl.ClientCnx.handlePartitionResponse(ClientCnx.java:516) ~[org.apache.pulsar-pulsar-client-original-2.5.0-ad0224407.jar:2.5.0-ad0224407]
	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:120) ~[org.apache.pulsar-pulsar-common-2.5.0-ad0224407.jar:2.5.0-ad0224407]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) ~[io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) ~[io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355) ~[io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321) ~[io.netty-netty-codec-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:295) ~[io.netty-netty-codec-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) ~[io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) ~[io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355) ~[io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) ~[io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) ~[io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) ~[io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) ~[io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) ~[io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) ~[io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[io.netty-netty-common-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[io.netty-netty-common-4.1.45.Final.jar:4.1.45.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.45.Final.jar:4.1.45.Final]
	at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_131]
Caused by: org.apache.pulsar.client.api.PulsarClientException: Topic reader cannot be created on a partitioned topic
	... 37 more

Add Kafka node.id support

Is your feature request related to a problem? Please describe.
Each kafka broker will have a node.id registered in zk. and this id is used in various places.
will try to avoid creating this, if not we will have to create zk node for this.

[BUG] Integrations tests not working on OSX

Describe the bug
Integrations tests are failing on OSX

To Reproduce
Steps to reproduce the behavior:

  1. use OSX
  2. mvn clean test
error Error: connect ECONNREFUSED 127.0.0.1:15014
    at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1137:16) {
  errno: -111,
  code: ‘ECONNREFUSED’,
  syscall: ‘connect’,
  address: ‘127.0.0.1’,
  port: 15014
}
ExitCode=1

Expected behavior
Should not fail

Additional context
We are running containers in host mode, which means that port should be available in both side. Kop broker is started in host, tests in Docker containers.

While is it working on linux as we have no vms, on OSX, testcontainers is forcing us to use the host.testcontainers.internal host to communicate with the host. Which is creating problem as we cannot easily advertize this adress.

Proposal

In order to quickly merge integrations tests with those in apache pulsar, I think we should be more close to what is done in apache/pulsar. We could:

  • bump Pulsar's version in testcontainers to directly rely on the official PulsarContainer object,
  • Make PulsarContainer the de-facto way to test proto handlers by making them first-class citizen,
  • revamp IntegrationsTest to use PulsarContainer with KoP.

What do you think?

[BUG] In current CommandDecoder, the request order may change under race condition

In current request handler. we passed in the request and after some handles return a responseFuture.
If several request comes in at same time, the handling time difference will cause the return future order different with the order that command comes in, And this will cause request handling order different.
This error is seen in the KafkaMessageOrderTest:

  • client send 10 messges, while first 9 messages batched together, and the last one not batched;
  • the above 2 request comes into handleProduceRequest, but the second request returned earlier than the first.

[BUG] listoffset not working well in cluster mode

Describe the bug
A clear and concise description of what the bug is.

To Reproduce
Steps to reproduce the behavior:

  1. Go to '...'
  2. Click on '....'
  3. Scroll down to '....'
  4. See error

Expected behavior
A clear and concise description of what you expected to happen.

Screenshots
If applicable, add screenshots to help explain your problem.

Additional context
Add any other context about the problem here.

[FEATURE] Add transaction support

RFC: streamnative/community#2

Tasks

  • Transaction Coordinator
    • producer id manager #372
    • producer fence
    • transaction metadata #295
    • add partitions to transaction #295
    • end transaction #295
    • transcation marker channel handler #295
    • transaction log
  • Consumer Coordinator
    • transaction offset commit #365
    • write transaction marker #365
  • Topic Partition Leader Broker
    • find transaction coordinator #295
    • send transaction message #295
    • write transaction marker #295
    • read isolation #295
    • aborted index log
  • Metrics
  • Test
    • basic produce transaction messages and consume test (read_committed, read_uncommitted) #295
    • Kafka client test migrate

allowAutoTopicCreationType parameter in KoP broker.conf when set to partitioned not creating partitioned topic when auto topic creation is enabled.

Describe the bug
With following parameter settings in broker.conf of KoP non-partitioned topics is being created while creating consumer and producer for non existing topic using pulsar client.
Parameter configuration:

allowAutoTopicCreation=true
allowAutoTopicCreationType=partitioned
defaultNumPartitions=1
image

How to Reproduce

  1. Deploy KoP with above mentioned configuration.
  2. Create producer or consumer using pulsar client with non existing topic.
  3. Check using topics list that non partitioned topic is being created .

Expected Behaviour
allowAutoTopicCreationType flag should be honoured when set to partitioned . Should create partitioned topic when auto topic creation is set to true.

Additional Info
Attached kop logs.
kop0autotopic.log
kop1autotopic.log
kop2autotopic.log

[FEATURE] Kafka Metadata cache support

In Kafka Metadata cache, all topics in the whole cluster is included, not only the topics in this broker.
So Kafka could easily use it to check if a topic is exists or not, such as in offsetCommit...

In Pulsar we could use a zk cache for all the existing persistent topics to achieve this?

[BUG] IllegalArgumentException when use KafkaConsumer#offsetsForTimes

Describe the bug
When tries KOP with kafka 2.1.0,2.2.0 and 2.3.0, if KafkaConsumer#offsetsForTimes invoked, we'll have the exception follows

Exception in thread "main" java.lang.IllegalArgumentException: Invalid negative timestamp
        at org.apache.kafka.clients.consumer.OffsetAndTimestamp.<init>(OffsetAndTimestamp.java:39)
        at org.apache.kafka.clients.consumer.internals.Fetcher.offsetsForTimes(Fetcher.java:500)
        at org.apache.kafka.clients.consumer.KafkaConsumer.offsetsForTimes(KafkaConsumer.java:2009)
        at org.apache.kafka.clients.consumer.KafkaConsumer.offsetsForTimes(KafkaConsumer.java:1973)
        at ConsumerKafka.lambda$compatibleTest$0(ConsumerKafka.java:94)
        at ConsumerKafka.tryFeature(ConsumerKafka.java:179)
        at ConsumerKafka.compatibleTest(ConsumerKafka.java:93)
        at ConsumerKafka.main(ConsumerKafka.java:62)

To Reproduce

  1. run pulsar on KOP
  2. start a kafka consumer under version 2.1.0,2.2.0 and 2.3.0
  3. invoke KafkaConsumer#offsetsForTimes

Expected behavior
KafkaConsumer#offsetsForTimes should work well

Additional context
When I do troubleshooting, I found KOP always return offset informations with timestamp -1.

 partitionData.complete(new ListOffsetResponse.PartitionData(
                     Errors.NONE,
                     RecordBatch.NO_TIMESTAMP, \\ -1
                     MessageIdUtils.getOffset(finalPosition.getLedgerId(), finalPosition.getEntryId())));

But in Kafka java client of 2.1.0,2.2.0,2.3.0, there is a parameter check on the timestamp which must be bigger than 0.

for (Map.Entry<TopicPartition, ListOffsetData> entry : fetchedOffsets.entrySet()) {
                // 'entry.getValue().timestamp' will not be null since we are guaranteed
                // to work with a v1 (or later) ListOffset request
                ListOffsetData offsetData = entry.getValue();
                offsetsByTimes.put(entry.getKey(), new OffsetAndTimestamp(offsetData.offset, offsetData.timestamp,
                        offsetData.leaderEpoch));
            }

...

public OffsetAndTimestamp(long offset, long timestamp, Optional<Integer> leaderEpoch) {
        if (offset < 0)
            throw new IllegalArgumentException("Invalid negative offset");

        if (timestamp < 0)
            throw new IllegalArgumentException("Invalid negative timestamp");

        this.offset = offset;
        this.timestamp = timestamp;
        this.leaderEpoch = leaderEpoch;
    }

This is the cause of the exception.

[BUG]Failed to authenticate HTTP request: Authentication required

Describe the bug

Whne using Kop, configure standalone.conf as below:

saslAllowedMechanisms=PLAIN
 
# Configuration to enable authentication and authorization
authenticationEnabled=true
authorizationEnabled=true
authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderToken
 
# If using secret key
tokenSecretKey=file:///path/to/secret.key

It failed to start Pulsar.

23:31:00.438 [pulsar-web-66-13] WARN  org.apache.pulsar.broker.web.AuthenticationFilter - [127.0.0.1] Failed to authenticate HTTP request: Authentication required
23:31:00.448 [pulsar-web-66-13] INFO  org.eclipse.jetty.server.RequestLog - 127.0.0.1 - - [16/Apr/2020:23:31:00 +0800] "GET /admin/v2/clusters HTTP/1.1" 401 0 "-" "Pulsar-Java-v2.5.0-ad0224407" 34
23:31:00.472 [main] ERROR io.streamnative.pulsar.handlers.kop.KafkaProtocolHandler - Failed to get retention policy for kafka metadata namespace public/__kafka
org.apache.pulsar.client.admin.PulsarAdminException$NotAuthorizedException: HTTP 401 Unauthorized

To Reproduce
Steps to reproduce the behavior:

  1. Download the streamnative platform and follow the quick start. It works without enabling sasl.
./bin/pulsar standalone -a 127.0.0.1
  1. Follow the instructions in Secure KoP in StreamNative documentation to enable authentication.
saslAllowedMechanisms=PLAIN
 
# Configuration to enable authentication and authorization
authenticationEnabled=true
authorizationEnabled=true
authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderToken
 
# If using secret key
tokenSecretKey=file:///path/to/secret.key

Failed to start Pulsar.

Expected behavior
A clear and concise description of what you expected to happen.

Screenshots

Additional context

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.