Giter Club home page Giter Club logo

kotlin-spark-api's Introduction

Kotlin for Apache® Spark™

Kotlin Stable JetBrains official project Maven Central Join the chat at https://gitter.im/JetBrains/kotlin-spark-api

Your next API to work with Apache Spark.

This project adds a missing layer of compatibility between Kotlin and Apache Spark. It allows Kotlin developers to use familiar language features such as data classes, and lambda expressions as simple expressions in curly braces or method references.

We have opened a Spark Project Improvement Proposal: Kotlin support for Apache Spark to work with the community towards getting Kotlin support as a first-class citizen in Apache Spark. We encourage you to voice your opinions and participate in the discussion.

Table of Contents

Supported versions of Apache Spark

Apache Spark Scala Kotlin for Apache Spark
3.2.1+ 2.12 kotlin-spark-api-3.2:1.1.0
3.1.3+ 2.12 kotlin-spark-api-3.1:1.1.0
3.0.3+ 2.12 kotlin-spark-api-3.0:1.1.0
2.4.1+ 2.12 kotlin-spark-api-2.4_2.12:1.0.2
2.4.1+ 2.11 kotlin-spark-api-2.4_2.11:1.0.2

Releases

The list of Kotlin for Apache Spark releases is available here. The Kotlin for Spark artifacts adhere to the following convention: [Apache Spark version]_[Scala core version]:[Kotlin for Apache Spark API version]

Maven Central

How to configure Kotlin for Apache Spark in your project

You can add Kotlin for Apache Spark as a dependency to your project: Maven, Gradle, SBT, and leinengen are supported.

Here's an example pom.xml:

<dependency>
  <groupId>org.jetbrains.kotlinx.spark</groupId>
  <artifactId>kotlin-spark-api-3.2</artifactId>
  <version>${kotlin-spark-api.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.12</artifactId>
    <version>${spark.version}</version>
</dependency>

Note that core is being compiled against Scala version 2.12.
You can find a complete example with pom.xml and build.gradle in the Quick Start Guide.

Once you have configured the dependency, you only need to add the following import to your Kotlin file:

import org.jetbrains.kotlinx.spark.api.*

Jupyter

The Kotlin Spark API also supports Kotlin Jupyter notebooks. To it, simply add

%use spark

to the top of your notebook. This will get the latest version of the API, together with the latest version of Spark. To define a certain version of Spark or the API itself, simply add it like this:

%use spark(spark=3.2, v=1.1.0)

Inside the notebook a Spark session will be initiated automatically. This can be accessed via the spark value. sc: JavaSparkContext can also be accessed directly. The API operates pretty similarly.

There is also support for HTML rendering of Datasets and simple (Java)RDDs. Check out the example as well.

To use Spark Streaming abilities, instead use

%use spark-streaming

This does not start a Spark session right away, meaning you can call withSparkStreaming(batchDuration) {} in whichever cell you want. Check out the example.

NOTE: You need kotlin-jupyter-kernel to be at least version 0.11.0.83 for the Kotlin Spark API to work. Also, if the %use spark magic does not output "Spark session has been started...", and %use spark-streaming doesn't work at all, add %useLatestDescriptors above it.

For more information, check the wiki.

Kotlin for Apache Spark features

Creating a SparkSession in Kotlin

val spark = SparkSession
        .builder()
        .master("local[2]")
        .appName("Simple Application").orCreate

This is not needed when running the Kotlin Spark API from a Jupyter notebook.

Creating a Dataset in Kotlin

spark.dsOf("a" to 1, "b" to 2)

The example above produces Dataset<Pair<String, Int>>. While Kotlin Pairs and Triples are supported, Scala Tuples are recommended for better support.

Null safety

There are several aliases in API, like leftJoin, rightJoin etc. These are null-safe by design. For example, leftJoin is aware of nullability and returns Dataset<Pair<LEFT, RIGHT?>>. Note that we are forcing RIGHT to be nullable for you as a developer to be able to handle this situation. NullPointerExceptions are hard to debug in Spark, and we're doing our best to make them as rare as possible.

In Spark, you might also come across Scala-native Option<*> or Java-compatible Optional<*> classes. We provide getOrNull() and getOrElse() functions for these to use Kotlin's null safety for good.

Similarly, you can also create Option<*>s and Optional<*>s like T?.toOptional() if a Spark function requires it.

withSpark function

We provide you with useful function withSpark, which accepts everything that may be needed to run Spark — properties, name, master location and so on. It also accepts a block of code to execute inside Spark context.

After work block ends, spark.stop() is called automatically.

Do not use this when running the Kotlin Spark API from a Jupyter notebook.

withSpark {
    dsOf(1, 2)
        .map { it X it } // creates Tuple2<Int, Int>
        .show()
}

dsOf is just one more way to create Dataset (Dataset<Int>) from varargs.

withCached function

It can easily happen that we need to fork our computation to several paths. To compute things only once we should call cache method. However, it becomes difficult to control when we're using cached Dataset and when not. It is also easy to forget to unpersist cached data, which can break things unexpectedly or take up more memory than intended.

To solve these problems we've added withCached function

withSpark {
    dsOf(1, 2, 3, 4, 5)
        .map { tupleOf(it, it + 2) }
        .withCached {
            showDS()
  
            filter { it._1 % 2 == 0 }.showDS()
        }
        .map { tupleOf(it._1, it._2, (it._1 + it._2) * 2) }
        .show()
}

Here we're showing cached Dataset for debugging purposes then filtering it. The filter method returns filtered Dataset and then the cached Dataset is being unpersisted, so we have more memory t o call the map method and collect the resulting Dataset.

toList and toArray methods

For more idiomatic Kotlin code we've added toList and toArray methods in this API. You can still use the collect method as in Scala API, however the result should be casted to Array. This is because collect returns a Scala array, which is not the same as Java/Kotlin one.

Column infix/operator functions

Similar to the Scala API for Columns, many of the operator functions could be ported over. For example:

dataset.select( col("colA") + 5 )
dataset.select( col("colA") / col("colB") )

dataset.where( col("colA") `===` 6 )
// or alternatively
dataset.where( col("colA") eq 6)

To read more, check the wiki.

Overload resolution ambiguity

We had to implement the functions reduceGroups and reduce for Kotlin separately as reduceGroupsK and reduceK respectively, because otherwise it caused resolution ambiguity between Kotlin, Scala and Java APIs, which was quite hard to solve.

We have a special example of work with this function in the Groups example.

Tuples

Inspired by ScalaTuplesInKotlin, the API introduces a lot of helper- extension functions to make working with Scala Tuples a breeze in your Kotlin Spark projects. While working with data classes is encouraged, for pair-like Datasets / RDDs / DStreams Scala Tuples are recommended, both for the useful helper functions, as well as Spark performance. To enable these features simply add

import org.jetbrains.kotlinx.spark.api.tuples.*

to the start of your file.

Tuple creation can be done in the following manners:

val a: Tuple2<Int, Long> = tupleOf(1, 2L)
val b: Tuple3<String, Double, Int> = t("test", 1.0, 2)
val c: Tuple3<Float, String, Int> = 5f X "aaa" X 1

To read more about tuples and all the added functions, refer to the wiki.

Streaming

A popular Spark extension is Spark Streaming. Of course the Kotlin Spark API also introduces a more Kotlin-esque approach to write your streaming programs. There are examples for use with a checkpoint, Kafka and SQL in the examples module.

We shall also provide a quick example below:

// Automatically provides ssc: JavaStreamingContext which starts and awaits termination or timeout
withSparkStreaming(batchDuration = Durations.seconds(1), timeout = 10_000) { // this: KSparkStreamingSession

    // create input stream for, for instance, Netcat: `$ nc -lk 9999`
    val lines: JavaReceiverInputDStream<String> = ssc.socketTextStream("localhost", 9999)
  
    // split input stream on space
    val words: JavaDStream<String> = lines.flatMap { it.split(" ").iterator() }

    // perform action on each formed RDD in the stream
    words.foreachRDD { rdd: JavaRDD<String>, _: Time ->
      
          // to convert the JavaRDD to a Dataset, we need a spark session using the RDD context
          withSpark(rdd) { // this: KSparkSession
            val dataframe: Dataset<TestRow> = rdd.map { TestRow(word = it) }.toDS()
            dataframe
                .groupByKey { it.word }
                .count()
                .show()
            // +-----+--------+
            // |  key|count(1)|
            // +-----+--------+
            // |hello|       1|
            // |   is|       1|
            // |    a|       1|
            // | this|       1|
            // | test|       3|
            // +-----+--------+
        }
    }
}

For more information, check the wiki.

Examples

For more, check out examples module. To get up and running quickly, check out this tutorial.

Reporting issues/Support

Please use GitHub issues for filing feature requests and bug reports. You are also welcome to join kotlin-spark channel in the Kotlin Slack.

Code of Conduct

This project and the corresponding community is governed by the JetBrains Open Source and Community Code of Conduct. Please make sure you read it.

License

Kotlin for Apache Spark is licensed under the Apache 2.0 License.

kotlin-spark-api's People

Contributors

jolanrensen avatar asm0dey avatar dependabot[bot] avatar mkhalusova avatar khud avatar gschulze avatar nonpool avatar plastic-karma avatar etolstoy avatar felixengl avatar cra avatar pihanya avatar srachmiletter avatar takanori-ugai avatar gitter-badger avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.