Giter Club home page Giter Club logo

spark-sql-kafka-offset-committer's Introduction

Kafka offset committer for Spark structured streaming

CircleCI

Kafka offset committer helps structured streaming query which uses Kafka Data Source to commit offsets which batch has been processed.

This project is not for replacing checkpoint mechanism of Spark with Kafka's one. To provide full of "fault-tolerance" semantic, Spark has to take 100% of control of manipulating checkpoint, and Kafka data source is no exception. This project can be used to leverage Kafka ecosystem tools to track the committed offsets on Spark checkpoint, which is not possible solely with Spark.

This project is inspired by SPARK-27549, which proposed to add this feature in Spark codebase, but the decision was taken as not include to Spark. You can call this project as a "follow-up" of SPARK-27549. This project is also inspired by Spark Atlas Connector - SAC leverages Scala reflection to extract topic information from query execution. Kafka offset committer uses the same approach to extract Kafka parameters. Credits to everyone involved SPARK-27549 & SAC.

Supported versions

Both Spark 3.0.x and 2.4.x is supported: it only means you should use these versions when using this project.

The project provides cross-compile for Scala 2.11 and 2.12 (thanks @redsk!) for Spark 2.4.x; please pick the right artifact for your Scala version.

Spark version Scala versions artifact version
2.4.x 2.11 / 2.12 0.4.0-spark-2.4
3.0.x 2.12 0.4.0-spark-3.0

How to import

Add this to your maven pom.xml file. If you're using other builds like groovy or sbt or so, please import the artifact accordingly; groupId: net.heartsavior.spark, artifactId: spark-sql-kafka-offset-committer_<scala_version>.

Please replace {{...}} with content in above matrix:

<dependency>
  <groupId>net.heartsavior.spark</groupId>
  <artifactId>spark-sql-kafka-offset-committer_{{scala_version}}</artifactId>
  <version>{{artifact_version}}</version>
</dependency>

You can dynamically include jar file while submitting, via leveraging --packages option. --packages net.heartsavior.spark:spark-sql-kafka-offset-committer:0.1.0. You may want to add --conf spark.sql.streaming.streamingQueryListeners=net.heartsavior.spark.KafkaOffsetCommitterListener as well, since you're dynamically adding the jar, hence the class is not accessible in your uber jar.

How to use

Kafka offset committer is implemented as StreamingQueryListener. There're two approaches to enable streaming query listener:

  1. Attach the instance of KafkaOffsetCommitterListener via below:
val listener = new KafkaOffsetCommitterListener()
spark.streams.addListener(listener)
  1. Add net.heartsavior.spark.KafkaOffsetCommitterListener to the value of spark.sql.streaming.streamingQueryListeners in your Spark config. (The value is separated by , so you can add multiple listeners if you have any other listeners.)

Once the listener is set, you can add special option to Kafka data source options so that Kafka committer can see the groupId to commit:

spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribePattern", "topic[1-3]")
  .option("startingOffsets", "earliest")
  .option("kafka.consumer.commit.groupid", "groupId1")
  .load()

"kafka.consumer.commit.groupid" is the new config to specify consumer group ID to commit. Manually specifying consumer group ID is needed, because Spark will assign unique consumer group ID to avoid multiple queries being conflicted to each other. This also means, you may want to thoughtfully set the option and decide the name of group ID so that multiple queries don't use the same group ID for committing.

Due to technical reason, the project uses reflection to extract options from query execution. Given we intercept Kafka parameters instead of source options of DataSource, adding "kafka." to option key is necessary and it brings unintended warning messages from Kafka side. (Sorry!) You can adjust your log4j config to hide the warning messages.

Here's an example of command to run spark-shell with kafka committer listener being set, and simple query to read from Kafka topics and write to Kafka topic.

command

./bin/spark-shell --master "local[3]" --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3 --jars ./spark-sql-kafka-offset-committer-0.1.0-SNAPSHOT.jar --conf spark.sql.streaming.streamingQueryListeners=net.heartsavior.spark.KafkaOffsetCommitterListener

query

val bootstrapServers = "localhost:9092"
val checkpointLocation = "/tmp/mykafkaaaaaaa"
val sourceTopics = Seq("truck_events_stream").mkString(",")
val sourceTopics2 = Seq("truck_speed_events_stream").mkString(",")

val targetTopic = "sparksinkstreaming"

val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", bootstrapServers).option("subscribe", sourceTopics).option("startingOffsets", "earliest").option("kafka.consumer.commit.groupid", "spark-sql-kafka-offset-committer-test-1").load()

val df2 = spark.readStream.format("kafka").option("kafka.bootstrap.servers", bootstrapServers).option("subscribe", sourceTopics2).option("startingOffsets", "earliest").option("kafka.consumer.commit.groupid", "spark-sql-kafka-offset-committer-test-1").load()

val query = df.union(df2).writeStream.format("kafka").option("kafka.bootstrap.servers", bootstrapServers).option("checkpointLocation", checkpointLocation).option("topic", targetTopic).option("kafka.atlas.cluster.name", "sink").start()

result

$ kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group spark-sql-kafka-offset-committer-test-1
Consumer group 'spark-sql-kafka-offset-committer-test-1' has no active members.

TOPIC                                    PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
truck_speed_events_stream                5          844553          844577          24              -               -               -
truck_speed_events_stream                2          675521          675540          19              -               -               -
truck_speed_events_stream                6          168828          168833          5               -               -               -
truck_speed_events_stream                3          337819          337827          8               -               -               -
truck_speed_events_stream                7          675566          675585          19              -               -               -
truck_speed_events_stream                4          168914          168919          5               -               -               -
truck_speed_events_stream                0          168894          168899          5               -               -               -
truck_speed_events_stream                8          675570          675589          19              -               -               -
truck_speed_events_stream                1          168917          168922          5               -               -               -
truck_events_stream                      0          3884586         3884695         109             -               -               -
truck_speed_events_stream                9          0               0               0               -               -               -

After stopping ingestion of records and waiting for query to fully process the records:

$ kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group spark-sql-kafka-offset-committer-test-1
Consumer group 'spark-sql-kafka-offset-committer-test-1' has no active members.

TOPIC                                    PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
truck_speed_events_stream                5          856338          856338          0               -               -               -
truck_speed_events_stream                2          684958          684958          0               -               -               -
truck_speed_events_stream                6          171186          171186          0               -               -               -
truck_speed_events_stream                3          342534          342534          0               -               -               -
truck_speed_events_stream                7          684998          684998          0               -               -               -
truck_speed_events_stream                4          171272          171272          0               -               -               -
truck_speed_events_stream                0          171255          171255          0               -               -               -
truck_speed_events_stream                8          684999          684999          0               -               -               -
truck_speed_events_stream                1          171276          171276          0               -               -               -
truck_events_stream                      0          3938820         3938820         0               -               -               -
truck_speed_events_stream                9          0               0               0               -               -               -

License

Copyright 2019-2021 Jungtaek Lim "[email protected]"

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

spark-sql-kafka-offset-committer's People

Contributors

heartsavior avatar redsk avatar szvasas avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

spark-sql-kafka-offset-committer's Issues

ERROR Utils: uncaught error in thread spark-listener-group-streams

Getting exception while using spark-sql-kafka-offset-committer
Version:

            <groupId>net.heartsavior.spark</groupId>
            <artifactId>spark-sql-kafka-offset-committer_2.12</artifactId>
            <version>0.4.0-spark-3.0</version>
        </dependency>

Error:
23/12/06 05:25:45 ERROR Utils: uncaught error in thread spark-listener-group-streams, stopping SparkContext
java.lang.NoSuchMethodError: 'org.apache.spark.sql.connector.read.InputPartition org.apache.spark.sql.execution.datasources.v2.DataSourceRDDPartition.inputPartition()'
at org.apache.spark.sql.kafka010.KafkaSourceInspector.$anonfun$extractSourceTopicsFromDataSourceV2$2(KafkaSourceInspector.scala:112)
at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:293)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at scala.collection.TraversableLike.flatMap(TraversableLike.scala:293)
at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:290)
at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:198)
at org.apache.spark.sql.kafka010.KafkaSourceInspector.$anonfun$extractSourceTopicsFromDataSourceV2$1(KafkaSourceInspector.scala:111)
at org.apache.spark.sql.kafka010.KafkaSourceInspector.$anonfun$extractSourceTopicsFromDataSourceV2$1$adapted(KafkaSourceInspector.scala:110)
at scala.collection.immutable.List.flatMap(List.scala:366)
at org.apache.spark.sql.kafka010.KafkaSourceInspector.extractSourceTopicsFromDataSourceV2(KafkaSourceInspector.scala:110)
at org.apache.spark.sql.kafka010.KafkaSourceInspector.$anonfun$populateKafkaParams$1(KafkaSourceInspector.scala:40)
at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:293)

Spark to read last committed offsets

Hi ,
We tried with your approach and able to commit the offsets manually with command "kafkaConsumer.commitSync()" . But when we try to read last committed offset through spark structured stream .option("startingOffsets","earliest"); it is not picking latest committed offsets .

At present we are using another kafka consumer to get last commited offsets of all partition of specific topic's by passing same groupId and passing those offsets to sprark like option("startingOffsets", "{"topicName":{"0":2764}}") to read the data . and its working as expected .

please let us know is there any way to get last committed offsets through spark structured stream.

Can sparkstreaming use this assembly

I use “spark-streaming-kafka-0-10_2.12” to read from kafka.
It seems that it can only used by Structured Streaming.
Can sparkstreaming use this assembly? It doesn't matter if it's not possible, I'll take some time to change my code using Structured Streaming.

image

image

Confirmation : manual offset commit on kafka using spark

Hi ,

As part of manual offset commit we followed your approach and implemented like below .Can you please confirm .

  1. As part of onQueryProgress(QueryProgressEvent queryProgress) method we are getting offset and partition details.

  2. from queryProgress -> endOffset getting offset and preparing OffsetAndMetadata

  3. from queryProgress -> endOffset getting partition and preparing TopicPartition

  4. using same group id value to kafkaConsumer which we are processing with spark readStream()

  5. finally commiting offset to kafkaConsumer like bleow

         Map<TopicPartition, OffsetAndMetadata> offsets
         kafkaConsumer.commitSync(offsets);
    

We are able to successfully committing mulitple partitions with offsets by above approach.

Next time when spark try to read kafka topic data with readStream() we use startingOffsetDetails() to get startingOffset which are commited through kafka consumer
Dataset dataFrame = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "127.0.0.1:9092")
.option("subscribe", kafkaTopic)
.option("enable.auto.commit",false)
.option("group.id", groupIdValue)
.option("startingOffsets", startingOffsetDetails())
.option("failOnDataLoss", "false")
.load()
.selectExpr("partition","offset","deserialize(value) as value");

//code to get commited offsets
//the below method returns data like commited offsets {"epeOffsetTpMutltiPartition":{"0":34278,"1":33778}}
private static String startingOffsetDetails(){
String offsetDetails="";
Properties properties = getProperties();
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
List partitions =kafkaConsumer.partitionsFor(kafkaTopic);
StringBuffer sb = new StringBuffer();
sb.append("{"" + kafkaTopic + "":{");
for(PartitionInfo partitionInfo : partitions){
TopicPartition tp = new TopicPartition(kafkaTopic, partitionInfo.partition());
OffsetAndMetadata offsetAndMetadata =kafkaConsumer.committed(tp);
if(null==offsetAndMetadata){
sb.append("""+ partitionInfo.partition() + "":" + 0+"," );
}
else{
sb.append("""+ partitionInfo.partition() + "":" + offsetAndMetadata.offset()+"," );
}
}
sb.deleteCharAt(sb.lastIndexOf(","));
sb.append("}}");
offsetDetails =sb.toString();
System.out.println(offsetDetails);

   return offsetDetails;

}

Can you please confirm the same from your end.

ERROR Build and Tests failing for Spark 3.5

Hi there HeartSaVioR.

You are a legend in this space for providing a library to commit offsets back to Kafka.

I have various businesses that require monitoring of consumers and this is a lifesaver.

Recently I looked into upgrading this library to Spark 3.5.

Through my investigation, I found that they've been significant changes in Spark that now break this library for potential upgrade.

Specifically here: https://github.com/apache/spark/blob/v3.5.0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala#L32

inputPartition is now inputPartitions which is a Seq of Partitions :(

There are definitely a lot more changes that break the tests as it stands. So will be stuck at Spark 3.0.x.

I've already tried the other versions up to 3.5.x

I'll be reading this code more thoroughly as time allows and seeing if I have the opportunity to make changes and keep this alive.

Otherwise, just letting you know and to also make this visible to you and others.

how to improt this app

Hello, I'm a beginner,
I want to ask how to import this program
I didn't find this in Maven

Library not working when spark job deployed in databricks 7.3 LTS

Library not working when spark job deployed in databricks 7.3 LTS with Spark 3.0.1 (Scala -2.12) + (net.heartsavior.spark:spark-sql-kafka-offset-committer_2.12:0.3.0-spark-3.0).

Following warning is displayed in the logs, however same is working in Local spark standalone cluster(with Apache Spark Distribution)

WARN KafkaOffsetCommitterListener: Cannot find query f835de4b-2919-4f60-b849-6082d294e4f7 from active spark session!

Is this library works fine with Kafka SASL SSL?

Please let me know what is the issue.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.