Giter Club home page Giter Club logo

kafkaproxy's People

Contributors

alfsch avatar dajudge avatar dependabot[bot] avatar x29a avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafkaproxy's Issues

Cannot invoke consumer twice

ea68a78 has an issue when two consumers are started consecutively.

Steps to reproduce:

  1. Build a docker image from commit specified above and use it in example/docker-compose.yaml.
  2. Run the following commands:
# Start the test setup
docker-compose -f example/docker-compose.yml up -d
# Create the test topic
docker run --rm --net host -i confluentinc/cp-zookeeper:5.2.1 kafka-topics --create --topic my-test-topic --bootstrap-server localhost:4000 --partitions 1 --replication-factor 1
# Produce to test topic
echo "Hello, kafkaproxy" | docker run --rm --net host -i confluentinc/cp-zookeeper:5.2.1 kafka-console-producer --broker-list localhost:4000 --topic my-test-topic
# Consume 1st time
docker run --rm --net host -it confluentinc/cp-zookeeper:5.2.1 kafka-console-consumer --bootstrap-server localhost:4000 --topic my-test-topic --from-beginning --max-messages 1
# Consume 2nd time
docker run --rm --net host -it confluentinc/cp-zookeeper:5.2.1 kafka-console-consumer --bootstrap-server localhost:4000 --topic my-test-topic --from-beginning --max-messages 1
# The last command fails with a stack trace

The log from the console of the last command is:

[2020-04-02 09:52:22,066] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'api_versions': Error reading array of size 44, only 21 bytes available
        at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:77)
        at org.apache.kafka.common.protocol.ApiKeys.parseResponse(ApiKeys.java:308)
        at org.apache.kafka.common.protocol.ApiKeys$1.parseResponse(ApiKeys.java:152)
        at org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:687)
        at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:811)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:544)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:235)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:317)
        at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1226)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1191)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1176)
        at kafka.tools.ConsoleConsumer$ConsumerWrapper.receive(ConsoleConsumer.scala:439)
        at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:105)
        at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:77)
        at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)
        at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
Processed a total of 0 messages

Support MSK IAM Auth

Would like to request support for MSK IAM Auth --> https://github.com/aws/aws-msk-iam-auth

I was trying to look through the code but it was not clear if the above is supported; did not see how to pass in more config that can be done using normal kafka client config.

Is there an easy way to pass the config for connection from proxy to broker as per article --> https://aws.amazon.com/blogs/big-data/securing-apache-kafka-is-easy-and-familiar-with-iam-access-control-for-amazon-msk/:

Then you specify the necessary Kafka properties:

ssl.truststore.location=<PATH_TO_TRUST_STORE_FILE>
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required; sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler		

Unable to produce and consume and create topics

List topics is working fine. but when i try to execute the command below, the error comes up

./bin/kafka-console-producer.sh --broker-list localhost:4000,localhost:4001,localhost:4002 --topic test
>test
[2020-12-03 14:01:38,867] WARN [Producer clientId=console-producer] Connection to node 2 (localhost/127.0.0.1:4004) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-12-03 14:01:38,973] WARN [Producer clientId=console-producer] Connection to node 8 (localhost/127.0.0.1:4005) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-12-03 14:01:39,080] WARN [Producer clientId=console-producer] Connection to node 4 (localhost/127.0.0.1:4007) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-12-03 14:01:39,181] WARN [Producer clientId=console-producer] Connection to node 8 (localhost/127.0.0.1:4005) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-12-03 14:01:39,287] WARN [Producer clientId=console-producer] Connection to node 4 (localhost/127.0.0.1:4007) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-12-03 14:01:39,393] WARN [Producer clientId=console-producer] Connection to node 7 (localhost/127.0.0.1:4008) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

This is my compose file

---
version: '3'
services:
  kafkaproxy:
    image: dajudge/kafkaproxy:0.0.5
    ports:
      - 4000:4000
    restart: always
    network_mode: "host"
    environment:
      - KAFKAPROXY_HOSTNAME=localhost
      - KAFKAPROXY_BASE_PORT=4000
      - KAFKAPROXY_BOOTSTRAP_SERVERS=somewherein.kafka.ap-southeast-1.amazonaws.com:9092,somewherein.kafka.ap-southeast-1.amazonaws.com:9092,somewherein.kafka.ap-southeast-1.amazonaws.com:9092
      - KAFKAPROXY_LOG_LEVEL=DEBUG

My MSK cluster has 9 brokers, 3 in each AZ, should I add 9 in the bootrapservers?

Thanks!

Kafka truststore should be optional

Contrary to the documentation, the truststore used for validating kafka server certificates is not optional.

When KAFKAPROXY_KAFKA_SSL_ENABLED is set to true and KAFKAPROXY_KAFKA_SSL_TRUSTSTORE_LOCATION is not set, then the built-in truststore (taken from JDK during native compile) should be used. Right now an exception is raised instead.

Originally raised in #31

Support Kafka 2.8?

Trying to connect to a cluster running 2.8 I get the following error:

18:48:12 ERROR [co.da.pr.RelayingChannelInboundHandler] (nioEventLoopGroup-4-2) [78e9fbd6-a1ee-4bb6-b901-d9b8bfdfab99] Exception in upstream channel.: java.lang.IllegalArgumentException: Invalid version for API key METADATA: 11
at org.apache.kafka.common.protocol.ApiKeys.schemaFor(ApiKeys.java:343)
at org.apache.kafka.common.protocol.ApiKeys.responseSchema(ApiKeys.java:317)
at org.apache.kafka.common.protocol.ApiKeys.parseResponse(ApiKeys.java:325)
at com.dajudge.kafkaproxy.protocol.KafkaMessage.lambda$responseBody$2(KafkaMessage.java:90)
at com.dajudge.kafkaproxy.protocol.KafkaMessage.withPayload(KafkaMessage.java:100)
at com.dajudge.kafkaproxy.protocol.KafkaMessage.responseBody(KafkaMessage.java:87)
at com.dajudge.kafkaproxy.protocol.rewrite.BaseReflectingRewriter.rewriteMessageBody(BaseReflectingRewriter.java:55)
at com.dajudge.kafkaproxy.protocol.rewrite.BaseReflectingRewriter.rewriteMessage(BaseReflectingRewriter.java:49)
at com.dajudge.kafkaproxy.protocol.rewrite.BaseReflectingRewriter.rewrite(BaseReflectingRewriter.java:41)
at com.dajudge.kafkaproxy.protocol.rewrite.CompositeRewriter.rewrite(CompositeRewriter.java:40)
at com.dajudge.kafkaproxy.protocol.KafkaRequestStore.process(KafkaRequestStore.java:66)
at com.dajudge.kafkaproxy.protocol.RewritingKafkaMessageDuplexHandler.channelRead(RewritingKafkaMessageDuplexHandler.java:39)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at com.dajudge.kafkaproxy.protocol.DecodingKafkaMessageInboundHandler.onMessageComplete(DecodingKafkaMessageInboundHandler.java:26)
at com.dajudge.kafkaproxy.protocol.DecodingKafkaMessageInboundHandler.onMessageComplete(DecodingKafkaMessageInboundHandler.java:23)
at com.dajudge.proxybase.AbstractChunkedMessageStreamInboundHandler.lambda$channelRead$0(AbstractChunkedMessageStreamInboundHandler.java:33)
at com.dajudge.proxybase.ChunkedMessageCollector.append(ChunkedMessageCollector.java:39)
at com.dajudge.proxybase.AbstractChunkedMessageStreamInboundHandler.channelRead(AbstractChunkedMessageStreamInboundHandler.java:33)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93)
at com.dajudge.proxybase.LoggingContextHandler.lambda$channelRead$0(LoggingContextHandler.java:40)
at com.dajudge.proxybase.LogHelper.withChannelId(LogHelper.java:33)
at com.dajudge.proxybase.LoggingContextHandler.channelRead(LoggingContextHandler.java:38)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:834)
at com.oracle.svm.core.thread.JavaThreads.threadStartRoutine(JavaThreads.java:517)
at com.oracle.svm.core.posix.thread.PosixJavaThreads.pthreadStartRoutine(PosixJavaThreads.java:192)

Looking at a similar proxy seems like the issue might be lack of support for version 2.8 of Kafka.

grepplabs/kafka-proxy@5c0fd2f

Unable to connect with Kafka Cluster while kafka-topics.sh can using security.protocol=SSL

Hi,

I was trying to use Kafkaproxy for AWS MSK.
I was running Kafkaproxy from a EC2 deployed in the same VPC as that of AWS MSK.
AWS MSK has Unauthenticated access enabled and following auth schemes disabled:

  • TLS client authentication through AWS Certificate Manager (ACM)
  • SASL/SCRAM authentication
  • IAM role-based authentication

I can connect to AWS MSK via kafka-topics.sh using security.protocol=SSL from EC2. AWS MSK has Kafka brokers that can be connected only on port 9094.
I have tried KAFKAPROXY_KAFKA_SSL_ENABLED=true flag as well but it didn't work.
It would be great if a little guidance is provided

Kafka proxy getting "Failed to initialize a channel" error.

I am trying to run a kafka cluster(v3.0.0) with 3 kafka replicas and 1 zookeeper. On top it we are using the Kafka proxy to setup ingress to allow remote access to kafka cluster. The config is as follows:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: kafkaproxy
spec:
  progressDeadlineSeconds: 600
  replicas: 1
  revisionHistoryLimit: 10
  selector:
    matchLabels:
      app: kafkaproxy
  strategy:
    rollingUpdate:
      maxSurge: 25%
      maxUnavailable: 25%
    type: RollingUpdate
  template:
    metadata:
      creationTimestamp: null
      labels:
        app: kafkaproxy
    spec:
      containers:
      - env:
        - name: KAFKAPROXY_HOSTNAME
          value: kafkaproxy.test.com
        - name: KAFKAPROXY_BASE_PORT
          value: "xxxx"
        - name: KAFKAPROXY_BOOTSTRAP_SERVERS
          value: kafka-kafka-bootstrap.kafka-test.svc:9092
        image: dajudge/kafkaproxy:0.0.18
        imagePullPolicy: IfNotPresent
        name: sampleproxy
        ports:
        - containerPort: 4000
          protocol: TCP
        resources:
           limits:
             cpu: 500m
             memory: 2Gi
           requests:
             cpu: 500m
             memory: 1Gi
        terminationMessagePath: /dev/termination-log
        terminationMessagePolicy: File
      dnsPolicy: ClusterFirst
      restartPolicy: Always
      schedulerName: default-scheduler
      securityContext:
        runAsUser: 0
      terminationGracePeriodSeconds: 30

I have also created a service for the proxy as follows:

apiVersion: v1
kind: Service
metadata:
  name: kafkaproxy
spec:
  ports:
  - name: bootstrap
    port: xxxx
    protocol: TCP
    targetPort: xxxx
  - name: broker-0
    port: xxxx+1
    protocol: TCP
    targetPort: xxxx+1
  - name: broker-1
    port: xxxx+2
    protocol: TCP
    targetPort: xxxx+2
  - name: broker-2
    port: xxxx+3
    protocol: TCP
    targetPort: xxxx+3
  selector:
    app: kafkaproxy
  sessionAffinity: None
  type: ClusterIP

When we are trying to send messages to Kafka cluster using the kafka-proxy hostname and port (kafkaproxy.test.com:xxxx) , We are getting errors as it is not able to connect to Kafka bootstrap service and is unable to get brokers to connect.

The error logs in Kafka proxy are:

        at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:261)
        at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
        at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170)
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:829)
        at com.oracle.svm.core.thread.JavaThreads.threadStartRoutine(JavaThreads.java:567)
        at com.oracle.svm.core.posix.thread.PosixJavaThreads.pthreadStartRoutine(PosixJavaThreads.java:192)

14:06:27 WARN  [io.ne.ch.ChannelInitializer] (nioEventLoopGroup-3-3) [] Failed to initialize a channel. Closing: [id: 0x7fc14273, L:/10.233.97.180:5060 - R:/100.65.159.39:52216]: io.netty.channel.ConnectTimeoutException: connection timed out: kafka-kafka-1.kafka-kafka-brokers.kafka1-test.svc/10.233.82.82:9092
        at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:261)
        at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
        at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170)
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:829)
        at com.oracle.svm.core.thread.JavaThreads.threadStartRoutine(JavaThreads.java:567)
        at com.oracle.svm.core.posix.thread.PosixJavaThreads.pthreadStartRoutine(PosixJavaThreads.java:192)

14:06:27 WARN  [io.ne.ch.ChannelInitializer] (nioEventLoopGroup-3-2) [] Failed to initialize a channel. Closing: [id: 0xbfcc2bd1, L:/10.233.97.180:5058 - R:/10.233.76.0:38834]: io.netty.channel.ConnectTimeoutException: connection timed out: kafka-kafka-0.kafka-kafka-brokers.kafka-test.svc/10.233.82.81:9092
        at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:261)
        at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
        at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170)
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:829)
        at com.oracle.svm.core.thread.JavaThreads.threadStartRoutine(JavaThreads.java:567)
        at com.oracle.svm.core.posix.thread.PosixJavaThreads.pthreadStartRoutine(PosixJavaThreads.java:192)

14:06:43 WARN  [io.ne.ch.ChannelInitializer] (nioEventLoopGroup-3-4) [] Failed to initialize a channel. Closing: [id: 59.39:42290]: io.netty.channel.ConnectTimeoutException: connection timed out: kafka-kafka-2.kafka-kafka-brokers.kafka1
        at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:261)
        at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
        at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170)
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:829)
        at com.oracle.svm.core.thread.JavaThreads.threadStartRoutine(JavaThreads.java:567)
        at com.oracle.svm.core.posix.thread.PosixJavaThreads.pthreadStartRoutine(PosixJavaThreads.java:192)

14:06:57 WARN  [io.ne.ch.ChannelInitializer] (nioEventLoopGroup-3-3) [] Failed to initialize a channel. Closing: [id: 5.0:32816]: io.netty.channel.ConnectTimeoutException: connection timed out: kafka-kafka-2.kafka-kafka-brokers.kafka1-n
        at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:261)
        at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
        at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170)
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:829)
        at com.oracle.svm.core.thread.JavaThreads.threadStartRoutine(JavaThreads.java:567)
        at com.oracle.svm.core.posix.thread.PosixJavaThreads.pthreadStartRoutine(PosixJavaThreads.java:192)

14:06:57 WARN  [io.ne.ch.ChannelInitializer] (nioEventLoopGroup-3-2) [] Failed to initialize a channel. Closing: [id: 5.0:45758]: io.netty.channel.ConnectTimeoutException: connection timed out: kafka-kafka-1.kafka-kafka-brokers.kafka1-n
        at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:261)
        at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
        at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170)
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:829)
        at com.oracle.svm.core.thread.JavaThreads.threadStartRoutine(JavaThreads.java:567)
        at com.oracle.svm.core.posix.thread.PosixJavaThreads.pthreadStartRoutine(PosixJavaThreads.java:192)

It is getting disconnected with Kafka bootstrap even if we are flushing in messages continuously thrugh the producer.

msk connection not working via SASL_SSL

Hello,
I am trying to use the proxy in-front of SASL_SSL MSK brokers.
For some reason I am getting:
terminated during authentication. This may happen due to any of the following reasons: (1) Authentication failed due to invalid credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS traffic (eg it may only allow HTTPS traffic), (3) Transient network issue

Any idea why ?

Connection Problem with Kafka 3 and Java Client

Hi,

when trying to connect to a Kafka 3 broker via the kafkaproxy with the Java Kafka Library in version greater than 2.8.1, consume does not work because it seems that the kafkaproxy is announcing the real brokers hostname (kafka1) instead of itself (e.g. localhost).

I wrote a MRE in this branch which updates the brokers and then uses the Java Kafka Library to produce 10 messages to a topic and then tries to consume them.

Producing works, when consuming one can see with DEBUG loglevel: java.net.UnknownHostException: kafka1 instead of the expected localhost.

Thanks for the great product!

Implement integration testing for different client libraries.

Kafkaproxy is implemented in java, which makes testing java clients very easy. But there's many different client libraries out there, so we need to implement integration testing for other client libraries, as well.

For starters implement a mechanism w/ docker to support testing arbitrary client libraries and use it w/ the following technologies to prove the concept is working:

  • Java
  • .net

Provide native binary of kafkaproxy for cloud native usage

Currently the docker images are graalvm based. These images are very huge and memory consuming. Also the startup-time with about one second is very high for cloud native usage.

Since kafkaproxy app is already based on the quarkus framework it's possible to create a native image to reduce the memory consumption about ten times. Also the startup times could be in regions about 50 ms.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.