dajudge / kafkaproxy Goto Github PK
View Code? Open in Web Editor NEWkafkaproxy is a reverse proxy for the wire protocol of Apache Kafka.
License: Apache License 2.0
kafkaproxy is a reverse proxy for the wire protocol of Apache Kafka.
License: Apache License 2.0
Netty reports unreleased buffers upon shutdown.
List topics is working fine. but when i try to execute the command below, the error comes up
./bin/kafka-console-producer.sh --broker-list localhost:4000,localhost:4001,localhost:4002 --topic test
>test
[2020-12-03 14:01:38,867] WARN [Producer clientId=console-producer] Connection to node 2 (localhost/127.0.0.1:4004) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-12-03 14:01:38,973] WARN [Producer clientId=console-producer] Connection to node 8 (localhost/127.0.0.1:4005) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-12-03 14:01:39,080] WARN [Producer clientId=console-producer] Connection to node 4 (localhost/127.0.0.1:4007) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-12-03 14:01:39,181] WARN [Producer clientId=console-producer] Connection to node 8 (localhost/127.0.0.1:4005) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-12-03 14:01:39,287] WARN [Producer clientId=console-producer] Connection to node 4 (localhost/127.0.0.1:4007) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2020-12-03 14:01:39,393] WARN [Producer clientId=console-producer] Connection to node 7 (localhost/127.0.0.1:4008) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
This is my compose file
---
version: '3'
services:
kafkaproxy:
image: dajudge/kafkaproxy:0.0.5
ports:
- 4000:4000
restart: always
network_mode: "host"
environment:
- KAFKAPROXY_HOSTNAME=localhost
- KAFKAPROXY_BASE_PORT=4000
- KAFKAPROXY_BOOTSTRAP_SERVERS=somewherein.kafka.ap-southeast-1.amazonaws.com:9092,somewherein.kafka.ap-southeast-1.amazonaws.com:9092,somewherein.kafka.ap-southeast-1.amazonaws.com:9092
- KAFKAPROXY_LOG_LEVEL=DEBUG
My MSK cluster has 9 brokers, 3 in each AZ, should I add 9 in the bootrapservers?
Thanks!
Hello,
I am trying to use the proxy in-front of SASL_SSL MSK brokers.
For some reason I am getting:
terminated during authentication. This may happen due to any of the following reasons: (1) Authentication failed due to invalid credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS traffic (eg it may only allow HTTPS traffic), (3) Transient network issue
Any idea why ?
Contrary to the documentation, the truststore used for validating kafka server certificates is not optional.
When KAFKAPROXY_KAFKA_SSL_ENABLED
is set to true
and KAFKAPROXY_KAFKA_SSL_TRUSTSTORE_LOCATION
is not set, then the built-in truststore (taken from JDK during native compile) should be used. Right now an exception is raised instead.
Originally raised in #31
Trying to connect to a cluster running 2.8 I get the following error:
18:48:12 ERROR [co.da.pr.RelayingChannelInboundHandler] (nioEventLoopGroup-4-2) [78e9fbd6-a1ee-4bb6-b901-d9b8bfdfab99] Exception in upstream channel.: java.lang.IllegalArgumentException: Invalid version for API key METADATA: 11
at org.apache.kafka.common.protocol.ApiKeys.schemaFor(ApiKeys.java:343)
at org.apache.kafka.common.protocol.ApiKeys.responseSchema(ApiKeys.java:317)
at org.apache.kafka.common.protocol.ApiKeys.parseResponse(ApiKeys.java:325)
at com.dajudge.kafkaproxy.protocol.KafkaMessage.lambda$responseBody$2(KafkaMessage.java:90)
at com.dajudge.kafkaproxy.protocol.KafkaMessage.withPayload(KafkaMessage.java:100)
at com.dajudge.kafkaproxy.protocol.KafkaMessage.responseBody(KafkaMessage.java:87)
at com.dajudge.kafkaproxy.protocol.rewrite.BaseReflectingRewriter.rewriteMessageBody(BaseReflectingRewriter.java:55)
at com.dajudge.kafkaproxy.protocol.rewrite.BaseReflectingRewriter.rewriteMessage(BaseReflectingRewriter.java:49)
at com.dajudge.kafkaproxy.protocol.rewrite.BaseReflectingRewriter.rewrite(BaseReflectingRewriter.java:41)
at com.dajudge.kafkaproxy.protocol.rewrite.CompositeRewriter.rewrite(CompositeRewriter.java:40)
at com.dajudge.kafkaproxy.protocol.KafkaRequestStore.process(KafkaRequestStore.java:66)
at com.dajudge.kafkaproxy.protocol.RewritingKafkaMessageDuplexHandler.channelRead(RewritingKafkaMessageDuplexHandler.java:39)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at com.dajudge.kafkaproxy.protocol.DecodingKafkaMessageInboundHandler.onMessageComplete(DecodingKafkaMessageInboundHandler.java:26)
at com.dajudge.kafkaproxy.protocol.DecodingKafkaMessageInboundHandler.onMessageComplete(DecodingKafkaMessageInboundHandler.java:23)
at com.dajudge.proxybase.AbstractChunkedMessageStreamInboundHandler.lambda$channelRead$0(AbstractChunkedMessageStreamInboundHandler.java:33)
at com.dajudge.proxybase.ChunkedMessageCollector.append(ChunkedMessageCollector.java:39)
at com.dajudge.proxybase.AbstractChunkedMessageStreamInboundHandler.channelRead(AbstractChunkedMessageStreamInboundHandler.java:33)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93)
at com.dajudge.proxybase.LoggingContextHandler.lambda$channelRead$0(LoggingContextHandler.java:40)
at com.dajudge.proxybase.LogHelper.withChannelId(LogHelper.java:33)
at com.dajudge.proxybase.LoggingContextHandler.channelRead(LoggingContextHandler.java:38)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:834)
at com.oracle.svm.core.thread.JavaThreads.threadStartRoutine(JavaThreads.java:517)
at com.oracle.svm.core.posix.thread.PosixJavaThreads.pthreadStartRoutine(PosixJavaThreads.java:192)
Looking at a similar proxy seems like the issue might be lack of support for version 2.8 of Kafka.
At the moment keystore types are hardcoded to "JKS". Support other keystore types.
ea68a78 has an issue when two consumers are started consecutively.
Steps to reproduce:
example/docker-compose.yaml
.# Start the test setup
docker-compose -f example/docker-compose.yml up -d
# Create the test topic
docker run --rm --net host -i confluentinc/cp-zookeeper:5.2.1 kafka-topics --create --topic my-test-topic --bootstrap-server localhost:4000 --partitions 1 --replication-factor 1
# Produce to test topic
echo "Hello, kafkaproxy" | docker run --rm --net host -i confluentinc/cp-zookeeper:5.2.1 kafka-console-producer --broker-list localhost:4000 --topic my-test-topic
# Consume 1st time
docker run --rm --net host -it confluentinc/cp-zookeeper:5.2.1 kafka-console-consumer --bootstrap-server localhost:4000 --topic my-test-topic --from-beginning --max-messages 1
# Consume 2nd time
docker run --rm --net host -it confluentinc/cp-zookeeper:5.2.1 kafka-console-consumer --bootstrap-server localhost:4000 --topic my-test-topic --from-beginning --max-messages 1
# The last command fails with a stack trace
The log from the console of the last command is:
[2020-04-02 09:52:22,066] ERROR Error processing message, terminating consumer process: (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'api_versions': Error reading array of size 44, only 21 bytes available
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:77)
at org.apache.kafka.common.protocol.ApiKeys.parseResponse(ApiKeys.java:308)
at org.apache.kafka.common.protocol.ApiKeys$1.parseResponse(ApiKeys.java:152)
at org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:687)
at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:811)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:544)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:235)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:317)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1226)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1191)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1176)
at kafka.tools.ConsoleConsumer$ConsumerWrapper.receive(ConsoleConsumer.scala:439)
at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:105)
at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:77)
at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)
at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
Processed a total of 0 messages
Hi,
when trying to connect to a Kafka 3 broker via the kafkaproxy with the Java Kafka Library in version greater than 2.8.1, consume does not work because it seems that the kafkaproxy is announcing the real brokers hostname (kafka1
) instead of itself (e.g. localhost
).
I wrote a MRE in this branch which updates the brokers and then uses the Java Kafka Library to produce 10 messages to a topic and then tries to consume them.
Producing works, when consuming one can see with DEBUG loglevel: java.net.UnknownHostException: kafka1
instead of the expected localhost
.
Thanks for the great product!
I am trying to run a kafka cluster(v3.0.0) with 3 kafka replicas and 1 zookeeper. On top it we are using the Kafka proxy to setup ingress to allow remote access to kafka cluster. The config is as follows:
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafkaproxy
spec:
progressDeadlineSeconds: 600
replicas: 1
revisionHistoryLimit: 10
selector:
matchLabels:
app: kafkaproxy
strategy:
rollingUpdate:
maxSurge: 25%
maxUnavailable: 25%
type: RollingUpdate
template:
metadata:
creationTimestamp: null
labels:
app: kafkaproxy
spec:
containers:
- env:
- name: KAFKAPROXY_HOSTNAME
value: kafkaproxy.test.com
- name: KAFKAPROXY_BASE_PORT
value: "xxxx"
- name: KAFKAPROXY_BOOTSTRAP_SERVERS
value: kafka-kafka-bootstrap.kafka-test.svc:9092
image: dajudge/kafkaproxy:0.0.18
imagePullPolicy: IfNotPresent
name: sampleproxy
ports:
- containerPort: 4000
protocol: TCP
resources:
limits:
cpu: 500m
memory: 2Gi
requests:
cpu: 500m
memory: 1Gi
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
dnsPolicy: ClusterFirst
restartPolicy: Always
schedulerName: default-scheduler
securityContext:
runAsUser: 0
terminationGracePeriodSeconds: 30
I have also created a service for the proxy as follows:
apiVersion: v1
kind: Service
metadata:
name: kafkaproxy
spec:
ports:
- name: bootstrap
port: xxxx
protocol: TCP
targetPort: xxxx
- name: broker-0
port: xxxx+1
protocol: TCP
targetPort: xxxx+1
- name: broker-1
port: xxxx+2
protocol: TCP
targetPort: xxxx+2
- name: broker-2
port: xxxx+3
protocol: TCP
targetPort: xxxx+3
selector:
app: kafkaproxy
sessionAffinity: None
type: ClusterIP
When we are trying to send messages to Kafka cluster using the kafka-proxy hostname and port (kafkaproxy.test.com:xxxx) , We are getting errors as it is not able to connect to Kafka bootstrap service and is unable to get brokers to connect.
The error logs in Kafka proxy are:
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:261)
at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:829)
at com.oracle.svm.core.thread.JavaThreads.threadStartRoutine(JavaThreads.java:567)
at com.oracle.svm.core.posix.thread.PosixJavaThreads.pthreadStartRoutine(PosixJavaThreads.java:192)
14:06:27 WARN [io.ne.ch.ChannelInitializer] (nioEventLoopGroup-3-3) [] Failed to initialize a channel. Closing: [id: 0x7fc14273, L:/10.233.97.180:5060 - R:/100.65.159.39:52216]: io.netty.channel.ConnectTimeoutException: connection timed out: kafka-kafka-1.kafka-kafka-brokers.kafka1-test.svc/10.233.82.82:9092
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:261)
at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:829)
at com.oracle.svm.core.thread.JavaThreads.threadStartRoutine(JavaThreads.java:567)
at com.oracle.svm.core.posix.thread.PosixJavaThreads.pthreadStartRoutine(PosixJavaThreads.java:192)
14:06:27 WARN [io.ne.ch.ChannelInitializer] (nioEventLoopGroup-3-2) [] Failed to initialize a channel. Closing: [id: 0xbfcc2bd1, L:/10.233.97.180:5058 - R:/10.233.76.0:38834]: io.netty.channel.ConnectTimeoutException: connection timed out: kafka-kafka-0.kafka-kafka-brokers.kafka-test.svc/10.233.82.81:9092
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:261)
at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:829)
at com.oracle.svm.core.thread.JavaThreads.threadStartRoutine(JavaThreads.java:567)
at com.oracle.svm.core.posix.thread.PosixJavaThreads.pthreadStartRoutine(PosixJavaThreads.java:192)
14:06:43 WARN [io.ne.ch.ChannelInitializer] (nioEventLoopGroup-3-4) [] Failed to initialize a channel. Closing: [id: 59.39:42290]: io.netty.channel.ConnectTimeoutException: connection timed out: kafka-kafka-2.kafka-kafka-brokers.kafka1
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:261)
at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:829)
at com.oracle.svm.core.thread.JavaThreads.threadStartRoutine(JavaThreads.java:567)
at com.oracle.svm.core.posix.thread.PosixJavaThreads.pthreadStartRoutine(PosixJavaThreads.java:192)
14:06:57 WARN [io.ne.ch.ChannelInitializer] (nioEventLoopGroup-3-3) [] Failed to initialize a channel. Closing: [id: 5.0:32816]: io.netty.channel.ConnectTimeoutException: connection timed out: kafka-kafka-2.kafka-kafka-brokers.kafka1-n
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:261)
at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:829)
at com.oracle.svm.core.thread.JavaThreads.threadStartRoutine(JavaThreads.java:567)
at com.oracle.svm.core.posix.thread.PosixJavaThreads.pthreadStartRoutine(PosixJavaThreads.java:192)
14:06:57 WARN [io.ne.ch.ChannelInitializer] (nioEventLoopGroup-3-2) [] Failed to initialize a channel. Closing: [id: 5.0:45758]: io.netty.channel.ConnectTimeoutException: connection timed out: kafka-kafka-1.kafka-kafka-brokers.kafka1-n
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:261)
at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:829)
at com.oracle.svm.core.thread.JavaThreads.threadStartRoutine(JavaThreads.java:567)
at com.oracle.svm.core.posix.thread.PosixJavaThreads.pthreadStartRoutine(PosixJavaThreads.java:192)
It is getting disconnected with Kafka bootstrap even if we are flushing in messages continuously thrugh the producer.
Gradle build shows the following:
The findbugs plugin has been deprecated. This is scheduled to be removed in Gradle 6.0. Consider using the com.github.spotbugs plugin instead.
=> Replace findbugs with https://spotbugs.github.io/
Would like to request support for MSK IAM Auth --> https://github.com/aws/aws-msk-iam-auth
I was trying to look through the code but it was not clear if the above is supported; did not see how to pass in more config that can be done using normal kafka client config.
Is there an easy way to pass the config for connection from proxy to broker as per article --> https://aws.amazon.com/blogs/big-data/securing-apache-kafka-is-easy-and-familiar-with-iam-access-control-for-amazon-msk/:
Then you specify the necessary Kafka properties:
ssl.truststore.location=<PATH_TO_TRUST_STORE_FILE>
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required; sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
Kafkaproxy is implemented in java, which makes testing java clients very easy. But there's many different client libraries out there, so we need to implement integration testing for other client libraries, as well.
For starters implement a mechanism w/ docker to support testing arbitrary client libraries and use it w/ the following technologies to prove the concept is working:
Hi,
I would like to capture what is consumed and published from an application, and I want the application to go through a Kafka proxy. Does this source code (without docker image) serve this purpose if I used it as a library in my code-base?
Idea:
graph LR
A[Application]
subgraph KP[Kafka Proxy]
B[Consumer]
C[Producer]
S[In Memory Message Store]
end
A --> KP
KP --> K[Kafka]
K --> KP
subgraph T[Testing]
V[Observation, shouldBeConsumed, shouldBePublished]
end
KP --> T
KP --> A
kafka {
shouldBePublished<ExampleMessage> {
actual.aggregateId == 123
&& actual.metadata.topic = "example-topic"
&& actual.metadata.headers["example-header"] == "example-value"
}
shouldBeConsumed<ExampleMessage> {
actual.aggregateId == 123
&& actual.metadata.topic = "example-topic"
&& actual.metadata.headers["example-header"] == "example-value"
}
}
The need is coming from this library that I ve been developing: https://github.com/Trendyol/stove
Do you think that the idea is possible with the proxy?
Currently the docker images are graalvm based. These images are very huge and memory consuming. Also the startup-time with about one second is very high for cloud native usage.
Since kafkaproxy app is already based on the quarkus framework it's possible to create a native image to reduce the memory consumption about ten times. Also the startup times could be in regions about 50 ms.
Downstream TLS already supports reloading of keystores from the filesystem at runtime, upstream doesn't. Fix this.
Hi,
I was trying to use Kafkaproxy for AWS MSK.
I was running Kafkaproxy from a EC2 deployed in the same VPC as that of AWS MSK.
AWS MSK has Unauthenticated access enabled and following auth schemes disabled:
I can connect to AWS MSK via kafka-topics.sh using security.protocol=SSL
from EC2. AWS MSK has Kafka brokers that can be connected only on port 9094
.
I have tried KAFKAPROXY_KAFKA_SSL_ENABLED=true
flag as well but it didn't work.
It would be great if a little guidance is provided
Our production environment uses Kafka version 0.10.2. Although pretty old, it is stable and sufficient.
Does Kafkaproxy support Kafka version 0.10.2?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.