Giter Club home page Giter Club logo

sparkling's Introduction

Sparkling - A Clojure API for Apache Spark

Sparkling is a Clojure API for Apache Spark.

Show me a small sample

(do
  (require '[sparkling.conf :as conf])
  (require '[sparkling.core :as spark])
  (spark/with-context sc (-> (conf/spark-conf)              ; this creates a spark context from the given context
                             (conf/app-name "sparkling-test")
                             (conf/master "local"))
                      (let [lines-rdd (spark/into-rdd sc ["This is the first line"   ;; here we provide data from a clojure collection.
                                                          "Testing spark"           ;; You could also read from a text file, or avro file.
                                                          "and sparkling"           ;; You could even approach a JDBC datasource
                                                          "Happy hacking!"])]
                        (spark/collect                      ;; get every element from the filtered RDD
                          (spark/filter                     ;; filter elements in the given RDD (lines-rdd)
                            #(.contains % "spark")          ;; a pure clojure function as filter predicate
                            lines-rdd)))))

Where to find more info

Check out our site for information about Gorillalabs Sparkling and a getting started guide.

Sample Project repo available

Just clone our getting-started repo and get going right now.

But note: There's one thing you need to be aware of: Certain namespaces need to be AOT-compiled, e.g. because the classes are referenced in the startup process by name. I'm doing this in my project.clj using the :aotdirective like this

            :aot [#".*" sparkling.serialization sparkling.destructuring]

Availabilty from Clojars

Sparkling is available from Clojars. To use with Leiningen, add

Clojars Project to your dependencies.

Build Status

Release Notes

2.0.0 - switch to Spark 2.0

  • added support for Spark SQL

1.2.3 - more developer friendly

  • added @/deref support for broadcasts Making it easier to work with broadcasts by using Clojure mechanisms. This is especially true for unit tests, as you could test without actual broadcasts, but with anything deref-able.
  • added RDD autonaming from fn metadata, eases navigation in SparkUI
  • added lookup functionality. Make sure the key to your Tuples is Serializable (Java serialization), as it will be serialized as part of your task definition, not only as part of your data. These are handled differently in Spark.

1.2.2 - added whole-text-files in sparkling.core.

(thanks to Jase Bell)

1.2.1 - improved Kryo Registration, AVRO reader + new Accumulator feature

  • feature: added accumulators (Thanks to Oleg Smirnov for that)
  • change: overhaul of Kryo Registration: Deprecated defregistrator macro, added Registrator type (see sparkling.serialization), with basic support of required types. This introduced a breaking change (sorry!): You need to aot-compile (or require) sparkling.serialization to run stuff in the REPL.
  • feature: added support for your own avro readers, making it possible to read types/records instead of maps. Major improvement on memory consumption.

1.1.1 - cleaned dependencies

  • No more spilling of unwanted stuff in your application. You only need to refer to sparkling to get a proper environment with Spark 1.2.1. In order to deploy to a cluster with Spark pre-installed, you need to set Spark dependency to provided in your project, though.

1.1.0 - Added a more clojuresque API

  • Use sparkling.core instead of sparkling.api for parameter orders similar to Clojure. Easier currying using partial.
  • Made it possible to use Keywords as Functions by serializing IFn instead of AFunction.
  • Tested with Spark 1.1.0 and Spark 1.2.1.

1.0.0 - Added value to the existing libraries (clj-spark and flambo)

  • It's about twice as fast by getting rid of a reflection call (thanks to David Jacot for his take on this).
  • Get rid of mapping/remapping inside the api functions, which
    • bloated the execution plan (mine shrinked to a third) and
    • (more importantly) allowed me to keep partitioner information.
  • adding more -values functions (e.g. map-values), againt to keep partitioner information.
  • Additional Sources for RDDs:
    • JdbcRDD: Reading Data from your JDBC source.
    • Hadoop-Avro-Reader: Reading AVRO Files from HDFS

Contributing

Feel free to fork the Sparkling repository, improve stuff and open up a pull request against our "develop" branch. However, we'll only add features with tests, so make sure everything is green ;)

Acknowledgements

Thanks to The Climate Corporation and their open source clj-spark project, and to Yieldbot for yieldbot/flambo which served as the starting point for this project.

License

Copyright (C) 2014-2015 Dr. Christian Betz, and the Gorillalabs team.

Distributed under the Eclipse Public License either version 1.0 or (at your option) any later version.

sparkling's People

Contributors

arnaudsj avatar b avatar blak3mill3r avatar chrisbetz avatar dajac avatar derekslager avatar dougselph avatar ducky427 avatar ifesdjeen avatar ikitommi avatar jasebell avatar jbrownlucid avatar johnchapin avatar kul avatar marchliu avatar master avatar mbaig avatar mgyucht avatar michaelklishin avatar mthomure avatar obohrer avatar obohrer-otm avatar otfrom avatar quantisan avatar shark8me avatar sorenmacbeth avatar strongh avatar tombert avatar viksit avatar xsyn avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

sparkling's Issues

Initial job has not accepted any resources – Clojure-specific Problem

Hi,

I have a standalone spark cluster running on my test machine (version 1.1.0 compiled from source without hadoop) and want to use it in a simple demo app with sparkling.

Here's a minimal source code that I used to reproduce the problem:

There are no other workers on the cluster using the memory. Here is the status of the cluster before the execution of the job:

screen shot 2015-01-08 at 15 20 54

When I run the demo app, the following error is risen:

Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory.

(Full job logs: https://gist.github.com/rizo/2662a1a4c81d1c572d09)

After the submission, the application occupies all of the cores and uses 512mb and exits after 30 sec timeout:

screen shot 2015-01-08 at 15 21 17

Initially I thought there was a problem with my cluster configuration, but spark-shell and pyspark are working without any problems. Also I successfully executed the job with spark-submit using the uberjar built with the same spark-base project.

What could be wrong here? Are there anything clojure-specific settings I have to set to run the app on the cluster?

Thanks in advance for your response!

Should we support Clojuresque parameter order?

We just kept the parameter order from clj-spark and flambo to simplify migration. However, it could be more intuitive to not follow clj-spark, but to follow the Clojure way of parameter order.

E.g., (map function collection) -> (spark/map function rdd).

Idea is to have a separate namespace with functions working the clojure way, fading out support for the clj-spark way after a time of having both in parallel.

What do you think?

Spark MLlib support?

Is there support for using the spark MLlib (machine learning) API? If not, any plans to support this or alternate clojure libraries to do this? Thanks, -David

Config could be a map.

Hey,
I was wondering if it wasn't more clojuresque to provide the configuration as a hashmap,
instead of building it up.

A very very early version of clj-spark had it implemented that way.

Reloaded workflow

I have some ideas for setting up sparkling to work in a "reloaded" style but I don't have time to track down the issues right now. Maybe someone has the answers here.

The desired workflow is: edit your source code locally, new source code is automatically reloaded on the driver and workers. To me this is a much more sensible approach than trying to define stuff over nrepl to each worker.

Poking around, AOT is not strictly necessary for function serialization. If the clojure namespaces are loaded, the corresponding classes are created and will serialize-deserialize as expected. (Currently this is failing for records, but I think that just needs more kryo definitions). You can see this just by defining an eval function, mapping it across quoted defn's on the workers, evaling the defn and using it as another spark op in the driver. I haven't tried pomegranate yet, hopefully we can even dynamically fetch dependencies.

Given this works, it should be possible to achieve the reloaded workflow.
The steps would be:

  1. Detect change in local file system and produce jar of just the src files
  2. Put that jar in the distributed cache
  3. Invoke a reload fn on each executor using map-partitions and collect'ing (put some logic in so it only happens once per execution)
  4. Reload fn can use tools.namespace to figure out what to reload. (This might require some poking around tools.namespace)

Presumably one would want to have separate namespaces for the driver and the workers, and set of some kind of component at each one.

Serializer issues while running on Mesos

I'm running into spark.serialization.Registrator issues with Kyro when running Sparkling on Mesos

task 0.3 in stage 0.0 (TID 3, test-server): java.io.IOException: org.apache.spark.SparkException: Failed to register classes with Kryo
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1155)
        at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
        at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
        at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
        at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
        at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58)
        at org.apache.spark.scheduler.Task.run(Task.scala:64)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Failed to register classes with Kryo
        at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:101)
        at org.apache.spark.serializer.KryoSerializerInstance.<init>(KryoSerializer.scala:153)
        at org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:115)
        at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:214)
        at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:177)
        at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1152)
        ... 11 more
Caused by: java.lang.ClassNotFoundException: sparkling.serialization.Registrator
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:348)
        at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$3.apply(KryoSerializer.scala:97)
        at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$3.apply(KryoSerializer.scala:97)
        at scala.Option.map(Option.scala:145)
        at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:97)
        ... 16 more

The configuration is as follows --

(defn make-spark-context []
  (let [c (-> (conf/spark-conf)
              (conf/master "mesos://zk://zk0:2181,zk1:2181,zk2:2181/mesos")
              (conf/app-name "test-app")
              (conf/set "spark.executor.uri" "hdfs://hadoop-namenode/spark/spark-1.3.0-bin-hadoop2.3.tgz")
              (conf/set "spark.ui.port" "5555")
              (conf/set "spark.home" "/usr/lib/spark/spark-1.3.0-bin-hadoop2.3"))]
    (spark/spark-context c)))

I'm reading a gzipped text file from a hdfs storage.

(let [sc (make-spark-context)
       file-rdd (spark/text-file "hdfs://hadoop-namenode/example/test.gz")]
     (println (.first file-rdd)))

This works fine when the master is set to "local[*]"

Reflection warning when using destructuring sugar

The problem

I'm observing some Reflection Warnings at the REPL when calling the s-de/fn macro, e.g

(do
  (set! *warn-on-reflection* true)

  (s-de/fn
    [(x ?y)]
    (+ x (or y 0))))
;Reflection warning, /Users/val/projects/.../data_sources.clj:48:3 - reference to field _1 can't be resolved.
;Reflection warning, /Users/val/projects/.../data_sources.clj:48:3 - reference to field _2 can't be resolved.
;Reflection warning, /Users/val/projects/.../data_sources.clj:48:3 - reference to field orNull can't be resolved.
=>
#object[linnaeus.imports.spark_scripts.data_sources$eval25833$fn__25834
        0x49b1f844
        "linnaeus.imports.spark_scripts.data_sources$eval25833$fn__25834@49b1f844"]

Why I think it's important

Method/field calls done via reflection have very bad performance, which tends to matter in the context of sparkling IMO - the destructuring code will tend to be called for each element to process.

Investigating the cause

Looking at the source code for sparkling.destructuring, I suspect this is because the TupleN class on which to call ._1, ._2 etc. can't be inferred statically (which is understandable in the context of Clojure).

Proposed solutions

For tuple destructuring

I suggest giving the caller the opportunity to provide a type-hint using meta-data on the list form, e.g

(s-de/fn
  [^{::s-de/type-hint Scala.Tuple2} (x ?y)]
  (+ x (or y 0)))

Or maybe the higher-level:

(s-de/fn
  [^{::s-de/tuple-arity 2} (x ?y)]
  (+ x (or y 0)))

From there, the macro could emit a type hint itself, or dispatch to a function that has a type hint etc.

This is not super readable, so we may complement this by:

  1. Having a single boolean flag and let the macro figure the Tuple arity from the length of the list:
(s-de/fn
  [^::s-de/type-hinted (x ?y)]
  (+ x (or y 0)))
  1. Having this type-hint on the argument vector itself, specifying that all nested tuples should be automatically type-hinted:
(s-de/fn
  ^::s-de/type-hinted [(x ?y (u w z))]
  (+ x (or y 0)))

For Optional Destructuring

Actually, this one can be inferred statically - all it takes is to have the emitted code use an Optional type hint, e.g by emitting a call to optional-or-nil.

Happy to provide a PR if needed! Keep up the great work.

Wrong method implementation of count in sql.clj

currently implemented as:

(defn count
  "grouped data count"
  [cols grouped-data]
  (.max grouped-data (into-array cols)))

It should be

(defn count
  "grouped data count"
  [cols grouped-data]
  (.count grouped-data (into-array cols)))

Adding Spark Streaming support

Hi @chrisbetz
Thanks for Sparkling, it's delicious. I just started working with it, and my employer Iris.tv is prepared to use it in production.
I am going to put some effort into Spark Streaming support, and I wonder if you have anything to share about the status of it. I see your note in the code that it is incomplete/experimental. I want to make use of it in production, if possible, so I'm going to try to get it to a point where we'd be comfortable doing that.
So, if you have a little time and you feel like explaining where things are (what works, what doesn't, what's left to be done), I would certainly appreciate it. If not, I will see what I can do on my own and send you PRs.

Serializer issues while running on Spark remote

I create some jobs use sparkling. They run fine when I run them on "local". But them always fault on any remote spark service form as "spark://...".

When I start write spark jobs used clojure, I found #18 and write the aot as proposal as #18 (comment) , but the error is throw:

...
16/05/04 12:32:57 INFO DAGScheduler: Job 1 failed: count at NativeMethodAccessorImpl.java:-2, took 0.440504 s

ClassNotFoundException sparkling.serialization.Registrator java.net.URLClassLoader.findClass (URLClassLoader.java:381)
16/05/04 12:32:57 INFO TaskSetManager: Lost task 0.3 in stage 1.0 (TID 27) on executor 192.168.100.11: java.io.IOException (org.apache.spark.SparkException: Failed to register classes with Kryo) [duplicate 24]
16/05/04 12:32:57 INFO TaskSetManager: Lost task 2.3 in stage 1.0 (TID 29) on executor 192.168.100.17: java.io.IOException (org.apache.spark.SparkException: Failed to register classes with Kryo) [duplicate 25]
...

The project.clj is:

(defproject operate.xcurrency.washing "0.1.0-SNAPSHOT"
  ...
  :dependencies [[org.clojure/clojure "1.8.0"]
                 [org.clojure/data.json "0.2.6"]
                 [gorillalabs/sparkling "1.2.4"]
                 [org.apache.spark/spark-core_2.10 "1.6.1"]
                 [clj-time "0.8.0"]
                 [org.clojars.marsliu/clj-parsec "0.1.0-SNAPSHOT"]
                 [com.tratao.operate.core "0.1.0-SNAPSHOT"]]
  :plugins [[cider/cider-nrepl "0.12.0-SNAPSHOT"]]
  :uberjar-merge-with {"reference.conf" [slurp str spit]}
  :source-paths ["src/main/clojure"]
  :profiles {:provided {:dependencies [[org.apache.spark/spark-core_2.10 "1.6.1"]]}}
  :aot [#".*" sparkling.serialization sparkling.destructuring]
  :main operate.xcurrency.washing
  :test-paths ["src/main/clojure" "src/test/clojure"])

and there is a private library named "com.tratao.operate.core" has a project.clj as:

(defproject com.tratao.operate.core "0.1.0-SNAPSHOT"
  ...
  :dependencies [[org.clojure/clojure "1.8.0"]
                 [org.clojure/data.json "0.2.6"]
                 [org.clojure/tools.nrepl "0.2.12"]
                 [org.clojure/tools.namespace "0.2.11"]
                 [gorillalabs/sparkling "1.2.4"]
                 [org.apache.spark/spark-core_2.10 "1.6.1"]
                 [clj-time "0.8.0"]
                 [org.clojars.marsliu/clj-parsec "0.1.0-SNAPSHOT"]]
  :plugins [[cider/cider-nrepl "0.12.0-SNAPSHOT"]]
  :source-paths ["src/main/clojure"]
  :aot [#".*" sparkling.serialization sparkling.destructuring]
  :main com.tratao.operate.server.repl
  :test-paths ["src/main/clojure" "src/test/clojure"])

If I unzip the jar and find Register class, got as :

[operate][~/DataAnalysis/jobs/xcurrent/washing]$ jar tvf target/operate.xcurrency.washing-0.1.0-SNAPSHOT-standalone.jar | grep sparkling.serialization\$register
  3027 Tue May 03 17:00:48 CST 2016 sparkling/serialization$register_base_classes.class
  2724 Tue May 03 17:00:48 CST 2016 sparkling/serialization$register.class
   996 Tue May 03 17:00:48 CST 2016 sparkling/serialization$register_class_with_serializer.class
  1289 Tue May 03 17:00:48 CST 2016 sparkling/serialization$register_optional.class
   850 Tue May 03 17:00:48 CST 2016 sparkling/serialization$register_class.class
  1276 Tue May 03 17:00:48 CST 2016 sparkling/serialization$register_clojure.class
  2615 Tue May 03 17:00:48 CST 2016 sparkling/serialization$register_scala.class
  1483 Tue May 03 17:00:48 CST 2016 sparkling/serialization$register_native_array_serializers.class
   938 Tue May 03 17:00:48 CST 2016 sparkling/serialization$register_class_with_id.class
  2297 Tue May 03 17:00:48 CST 2016 sparkling/serialization$register_java_class_serializers.class
  1974 Tue May 03 17:00:48 CST 2016 sparkling/serialization$register_spark.class
  1138 Tue May 03 17:00:48 CST 2016 sparkling/serialization$register_class_with_serializer_and_id.class
  2531 Tue May 03 17:00:48 CST 2016 sparkling/serialization$register_array_type.class
   809 Tue May 03 17:00:48 CST 2016 sparkling/serialization$register_optional$fn__2276.class

spark/map over sql dataset

Hello! I was wondering - how can I implement spark/map function over SQL dataset/dataframe?
Today I'm converting dataframe to json-rdd, and it is huge overhead.
Thank you!

Supported features matrix

Hello,

As a potential user I would really appreciate some kind of supported feature matrix.

Something like:

Feature Support Notes
foo NO
bar YES
baz PARTIAL All but quox

would go a long way in informing decisions regarding proceeding with sparkling.

Thanks

Recompile function for REPL development

Since Spark requires functions to be compiled in order to work, I wrote a little helper function that aids REPL development by recompiling the current namespace.

(defn recompile [] (binding [*compile-files* true] (require (symbol (str *ns*)) :reload-all)))

Not sure if this would fit in the project anywhere, but I found it useful for speeding up my workflow, particularly when connected to an nREPL from my editor. Calling (recompile) certainly beats booting up a new REPL and reconnecting.

Sparkling incompatibility with prismatic/schema

I'm getting this error from my code:

16/06/02 15:10:30 WARN Utils: Error serializing IFn sparkling.core$ftruthy_QMARK_$fn__3467@7ce0f591
java.io.NotSerializableException: clojure.lang.Delay

whenever I run (spark/filter parse/error?) on an RDD.

The function in question is here, I've simplified it as much as possible:

(s/defn error? :- s/Bool
  [log :- s/Str]
    true)

Unable to use mongo spark

Unable to use spark with mongodb. Getting error
Exception in thread "main" java.lang.ClassCastException: Cannot cast org.apache.spark.sql.SparkSession to org.apache.spark.api.java.JavaSparkContext, compiling:(spark_mongo.clj:30:9)

My project settings:
[org.mongodb.spark/mongo-spark-connector_2.11 "2.3.2"] [gorillalabs/sparkling "2.1.3"]

Spark code:
`(ns etl.sparktest
(:require [sparkling.conf :as conf]
[sparkling.core :as spark])
(:import (org.apache.spark.sql SparkSession)
(com.mongodb.spark MongoSpark)))

(def sparks
(->
(SparkSession/builder)
(.appName "MongoSparkConnectorIntro")
(.master "local")
(.config "spark.mongodb.input.uri", "mongodb://127.0.0.1/db.collection")
(.config "spark.mongodb.output.uri", "mongodb://127.0.0.1/db.collection")
.getOrCreate
))

(def df (MongoSpark/load (sparks)))`

Parallelize argument problem in Getting Started document

I'm working through http://gorillalabs.github.io/sparkling/articles/getting_started.html on a Mac running OS X 10.11.3, fully updated leiningen systems, running Java 8 update 71, CLI REPL running in a bash.session. The same behavior seems to arise whether I use the project.clj file included with the download, or one I've modified to be

(defproject gorillalabs/sparkling-getting-started "1.0.0-SNAPSHOT"
            :description "A Sample project demonstrating the use of Sparkling (https://gorillalabs.github.io/sparkling/), as shown in tutorial https://gorillalabs.github.io/sparkling/articles/tfidf_guide.html"
            :url "https://gorillalabs.github.io/sparkling/articles/tfidf_guide.html"
            :license {:name "Eclipse Public License"
                      :url  "http://www.eclipse.org/legal/epl-v10.html"}
            :dependencies [[org.clojure/clojure "1.8.0"]                      ;; changed
                           [gorillalabs/sparkling "1.2.3"]]

            :aot [#".*" sparkling.serialization sparkling.destructuring]
            :main tf-idf.core                                                    ;; VVVVV changed
            :profiles {:provided {:dependencies [[org.apache.spark/spark-core_2.10 "1.6.0"]]}
                       :dev {:plugins [[lein-dotenv "RELEASE"]]}})

Everything is fine (with some minor differences, apparently, in the return value annotations you report in your examples), until I get to

tf-idf.core=> (def data (spark/parallelize sc [1 2 3 4 5] 4))
CompilerException java.lang.ClassCastException: Cannot cast java.lang.Long to java.util.List, compiling:(form-init6916667632007495202.clj:1:11)

I admit I'm not following the call chain up all the way through the variadic function calls, but it's pretty clear this stops being a problem when the numeric slices argument (4 in the example) isn't present.

Are slices created on demand, or does the context need to have them defined before manually assigning a slices argument here? Unclear from the walkthrough.

Support req: `take` can't be used with `map-to-pair`? [2.1.2]

I'm tooling up for some spark work; I've exported some data from Cassandra to a local file so I can test some of my logic in the REPL.

I'm hitting an initial problem:

(ns user
  (:require
    [sparkling.conf :as conf]
    [sparkling.core :as spark]
    [clojure.string :as str]))

(defn split-line
  [line]
  (let [pipex (str/index-of line \|)
        ruid (-> line
                 (subs 0 pipex)
                 str/trim)
        base64 (-> line
                   (subs (inc pipex))
                   str/trim)]
    (spark/tuple ruid base64)))

(comment

  (spark/with-context sc
                      (-> (conf/spark-conf)
                          (conf/master "local")
                          (conf/app-name "local-example"))
                      (->> (spark/text-file sc "receipts.txt")
                           (spark/take 2)
                           (spark/map-to-pair split-line)
                           #_ (spark/take 2)
                           ))

  )

fails with:

java.lang.IllegalArgumentException: No matching method found: mapToPair for class scala.collection.convert.Wrappers$SeqWrapper

It does work if I move the take call after the map-to-pair call. As I understand things, the order of take vs. map-to-pair shouldn't make a difference. Is that correct? Am I seeing a bug.

Also, have you considered setting up in Clojurian's slack for support?

Running your code guide

Extend guides section by having a documentation on how to deploy your system

  • locally
  • to a Spark Cluster
  • to YARN
  • to MESOS (sorry, no experience with that, this will be left out in the first version of the guide)

Core.matrix integration?

This is a somewhat open-ended issue, I'm interested in effective integration with core.matrix and Apache spark and how best to achieve this.

Some use cases that are potentially of interest:

  • extracting and/or viewing a spark RDD as a core.matrix array
  • using core.matrix operations within spark
  • loading core.matrix arrays into spark RDDs

Breaking change in 1.2.1

When upgrading from 1.1.1 to 1.2.1 I had to explicitly require sparkling.serialization in my code. Technically, I suppose this constitutes a breaking change, and thus the version should be 2.0.0 and not 1.2.1.

Technicalities aside, the only way I could find this out was to read the comments in the source code. I think this should be noted in the change log and also included somewhere in the guides.

Is there going to be a version that supports Spark 2.0 soon

I've tried to run one of my existing jobs using the latest version Spark and it failed with:

java.lang.AbstractMethodError: sparkling.function.FlatMapFunction.call(Ljava/lang/Object;)Ljava/util/Iterator;
at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:124)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:124)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Looking at the release notes from Apache, this could be due to this behaviour change:

•Java RDD’s flatMap and mapPartitions functions used to require functions returning Java Iterable. They have been updated to require functions returning Java iterator so the functions do not need to materialize all the data.

Connecting to s3n uri with authentication failing

I'm facing an error while trying to connect to an s3 bucket. I'm using
(conf/set "fs.s3.awsAccessKeyId" ....) to set the accesskey and the secretKey but I'm getting this error.
Execution error (IllegalArgumentException) at org.apache.hadoop.fs.s3.S3Credentials/initialize (S3Credentials.java:70).
AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively).

Support for modern Spark / Scala versions?

AFAICT, it's not possible to use Sparkling with modern Spark versions like 2.4.5, because they require Scala 2.11 whereas Sparkling requires Scala 2.10 in its interop.

Am I correct in this? If so, is there a plan to upgrade? I'm finding myself more and more limited by having to use Spark 2.1, all the more so as I add other library dependencies.

Local namespace symbols referenced from foreach-partition function not found.

It would appear that referencing symbols within the current namespace from the function passed to foreach-partition. Oddly enough referencing protocols works, so I originally thought it was specific to helper functions called from protocol implementations, but it turns out calling the helper function directly is broken too. These issues are reproducible when running a spark job with spark-submit using a cluster as the master (I'm using mesos as the master), but obviously do not repro using "local" as the master. I'm using sparkling 1.2.5.

Here's the error I'm seeing:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage 0.0 (TID 6, ip-x-x-x-x.us-west-2.compute.internal): java.lang.IllegalStateException: Attempting to call unbound fn: #'sparkling-bug.core/helper-fn
  at clojure.lang.Var$Unbound.throwArity(Var.java:43)
  at clojure.lang.AFn.invoke(AFn.java:36)
  at sparkling_bug.core.MyImpl.compute(core.clj:21)
  at sparkling_bug.core$foreach_partition_is_broken$fn__551$fn__552.invoke(core.clj:34)
  at clojure.core$map$fn__4785.invoke(core.clj:2644)

Here's a code snippet:

(ns sparkling-bug.core
  (:gen-class)
  (:require
    [sparkling.core :as spark]
    [sparkling.conf :as sparkconf]))

(defn- connect
  []
  (->  (sparkconf/spark-conf)
    (sparkconf/app-name "sparkling-bug")
    (spark/spark-context)))

(defprotocol MyProtocol
  (compute [this x] "Perform some computation"))

(defn- helper-fn [x y] (+ x y))

(defrecord MyImpl [y]
  MyProtocol
  (compute [this x] (helper-fn x y)))

(defn- map-partition-works
  [sc]
  (let [input (spark/parallelize sc (range 100))
        worker (->MyImpl 7)
        result (spark/map-partition (fn [part] (map #(compute worker %) (iterator-seq part))) input)]
    (println (spark/collect result))))

(defn- foreach-partition-is-broken
  [sc]
  (let [input (spark/parallelize sc (range 100))
        worker (->MyImpl 7)]
    (spark/foreach-partition (fn [part] (println (map #(compute worker %) part))) input)))

(defn- require-fixes-the-issue
  [sc]
  (let [input (spark/parallelize sc (range 100))
        worker (->MyImpl 7)]
    (spark/foreach-partition (fn [part]
      ;; require our own namespace so that the helper fn symbol is declared on the executor
      (require 'sparkling-bug.core) 
      (println (map #(compute worker %) part))) input)))

(defn- calling-local-function-is-broken
  [sc]
  (let [input (spark/parallelize sc (range 100))]
    (spark/foreach-partition
      (fn [part]
        (println (map #(helper-fn 7 %) part))) input)))

(defn- calling-let-function-works
  [sc]
  (let [input (spark/parallelize sc (range 100))
        tmp-fn helper-fn]
    (spark/foreach-partition
      (fn [part]
        (println (map #(tmp-fn 7 %) part))) input)))

(defn -main
  "Run a very simple sparkling job."
  [& _]
  (with-open [sc (connect)]
    ;; (map-partition-works sc) ;; ok
    (foreach-partition-is-broken sc) ;; error
    ;; (require-fixes-the-issue sc) ;; ok
    ;; (calling-local-function-is-broken sc) ;; error
    ;; (calling-let-function-works sc) ;; ok
    ))

Here's my project.clj:

(defproject sparkling_bug "0.1.0-SNAPSHOT"
  :description "Example of sparkling issue calling a protocol function on a record instance."
  :dependencies [[org.clojure/clojure "1.8.0"]
                 [gorillalabs/sparkling "1.2.5"]
                 [org.apache.spark/spark-core_2.10 "1.6.1"]
                 [org.apache.spark/spark-sql_2.10 "1.6.1"]]
  :aot [#".*" sparkling.serialization sparkling.destructuring]
  :main sparkling-bug.core
  :target-path "target/%s")

Here's the command I'm using to launch:

spark-submit --master mesos://localhost:5050 --conf spark.mesos.executor.home=/opt/spark-1.6.1-bin-hadoop2.6 ./target/uberjar/sparkling_bug-0.1.0-SNAPSHOT-standalone.jar

Use with AWS Glue

Is it recommended to use sparkling with AWS Glue?

AWS Glue is a serverless stack on top of Spark provided by EMR. It has some additional features to Spark, but spark context is available as normal.

Has anyone tried integrating with Glue?

ClassNotFoundException: OpenHashMap$$anonfun$1

Get this error while compiling

with

[gorillalabs/sparkling "2.1.3"]
[org.apache.spark/spark-sql_2.12 "2.4.7"]
[org.apache.spark/spark-hive_2.12 "2.4.7"]
Caused by: java.lang.ClassNotFoundException: org.apache.spark.util.collection.OpenHashMap$$anonfun$1
	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:348)
	at clojure.lang.RT.classForName(RT.java:2204)
	at clojure.lang.RT.classForName(RT.java:2213)
	at sparkling.serialization$register_spark.<clinit>(serialization.clj:110)

How to perform Secondary Sort?

I have the following scala code which I want to write using sparkling:

case class RFMCKey(cId: String, R: Double, F: Double, M: Double, C: Double)
class RFMCPartitioner(partitions: Int) extends Partitioner {
 require(partitions >= 0, "Number of partitions ($partitions) cannot be negative.")
 override def numPartitions: Int = partitions
 override def getPartition(key: Any): Int = {
 val k = key.asInstanceOf[RFMCKey]
 k.cId.hashCode() % numPartitions
 }
}
object RFMCKey {
 implicit def orderingByIdAirportIdDelay[A <: RFMCKey] : Ordering[A] = {
    Ordering.by(k => (k.R, k.F * -1, k.M * -1, k.C * -1))
 }
}

How can I do this? If I can, how do I use this for secondary sorting using sparkling? i.e. I want to perform data.repartitionAndSortWithinPartitions(new RFMCPartitioner(partitions)).

subvectors don't serialize properly

This works:

(testing
  "we can serialize and deserialize vectors"
  (is (= [1 2 3 4 5]
         (ks/round-trip kryo [1 2 3 4 5]))))

But this does not, it fails with the exception included at the end:

(testing
  "we can serialize and deserialize sub-vectors"
  (is (= [2 3 4]
         (ks/round-trip kryo (subvec [1 2 3 4 5] 1 4)))))

I tried registering IPersistentVector, IPersistentMap and APersistentVector$SubVector but that didn't help. Don't know enough about how the serialization works to understand why vectors work as expected but subvectors don't.

Stack trace:

lein test :only sparkling.serialization-test/new-registrator

ERROR in (new-registrator) (APersistentVector.java:359)
we can serialize and deserialize sub-vectors
expected: (= [2 3 4](ks/round-trip kryo %28subvec [1 2 3 4 5] 1 4%29))
actual: java.lang.UnsupportedOperationException: null
at clojure.lang.APersistentVector.add (APersistentVector.java:359)
com.esotericsoftware.kryo.serializers.CollectionSerializer.read (CollectionSerializer.java:109)
com.esotericsoftware.kryo.serializers.CollectionSerializer.read (CollectionSerializer.java:18)
com.esotericsoftware.kryo.Kryo.readClassAndObject (Kryo.java:729)
carbonite.buffer$read_bytes.invoke (buffer.clj:89)
sparkling.kryoserializer$round_trip.invoke (kryoserializer.clj:15)
sparkling.serialization_test/fn (serialization_test.clj:34)
clojure.test$test_var$fn__7670.invoke (test.clj:704)
clojure.test$test_var.invoke (test.clj:704)
clojure.test$test_vars$fn__7692$fn__7697.invoke (test.clj:722)
clojure.test$default_fixture.invoke (test.clj:674)
clojure.test$test_vars$fn__7692.invoke (test.clj:722)
clojure.test$default_fixture.invoke (test.clj:674)
clojure.test$test_vars.invoke (test.clj:718)
clojure.test$test_all_vars.invoke (test.clj:728)
clojure.test$test_ns.invoke (test.clj:747)
clojure.core$map$fn__4553.invoke (core.clj:2624)
clojure.lang.LazySeq.sval (LazySeq.java:40)
clojure.lang.LazySeq.seq (LazySeq.java:49)
clojure.lang.Cons.next (Cons.java:39)
clojure.lang.RT.next (RT.java:674)
clojure.core/next (core.clj:64)
clojure.core$reduce1.invoke (core.clj:909)
clojure.core$reduce1.invoke (core.clj:900)
clojure.core$merge_with.doInvoke (core.clj:2936)
clojure.lang.RestFn.applyTo (RestFn.java:139)
clojure.core$apply.invoke (core.clj:632)
clojure.test$run_tests.doInvoke (test.clj:762)
clojure.lang.RestFn.applyTo (RestFn.java:137)
clojure.core$apply.invoke (core.clj:630)
user$eval87$fn__146$fn__177.invoke (form-init6084764228854399442.clj:1)
user$eval87$fn__146$fn__147.invoke (form-init6084764228854399442.clj:1)
user$eval87$fn__146.invoke (form-init6084764228854399442.clj:1)
user$eval87.invoke (form-init6084764228854399442.clj:1)
clojure.lang.Compiler.eval (Compiler.java:6782)
clojure.lang.Compiler.eval (Compiler.java:6772)
clojure.lang.Compiler.load (Compiler.java:7227)
clojure.lang.Compiler.loadFile (Compiler.java:7165)
clojure.main$load_script.invoke (main.clj:275)
clojure.main$init_opt.invoke (main.clj:280)
clojure.main$initialize.invoke (main.clj:308)
clojure.main$null_opt.invoke (main.clj:343)
clojure.main$main.doInvoke (main.clj:421)
clojure.lang.RestFn.invoke (RestFn.java:421)
clojure.lang.Var.invoke (Var.java:383)
clojure.lang.AFn.applyToHelper (AFn.java:156)
clojure.lang.Var.applyTo (Var.java:700)
clojure.main.main (main.java:37)

A runtime error when Sparkling is used with Ring, Compojure and Elasticsearch-Hadoop

We get a run time error* when we're trying to use Sparkling in an existing project. I've reproduced the same error in this fork of sparkling-getting-started example. The failure is caused by this tiny commit.

The example failing project works fine when I remove Sparkling specific parts. I've demonstrated this in a branch: without-sparkling

How could we fix this problem if we want to keep ring.middleware?


* The error is: java.lang.RuntimeException: No such var: head/head-response, compiling:(ring/middleware/resource.clj:16:11)

The stacktrace we get could be seen in stack-trace.txt.

Creating a stand-alone jar file from Sparkling

Hello,

I just cloned Sparkling package and tried to create a standalone jar file using lein uberjar; I see none of clojure files have a gen-class keyword; So, just for the test, I put a simple helloword.clj code in sparkling folder with the following command:
(ns sparkling.helloworld
(:gen-class)
)

(defn -main & args
)

When I run lein uberjar, it creates jar file but does not generate any class for helloword.cli; There is just clj file in clojure/sparkling path; I also added :uberjat {:aot :all} to project.clj but the problem still exists; Is there any trick I have to apply to create the jar file? I probably want to run the Spark job on EMR cluster and, to do this, I need to be able to create a stand-alone JAR file with all dependencies; Your comments and advice would be really appreciated;

With best regards,

Ali

Kryo serialization error

Hello! I want to use sparkling with deeplearning4j
And when I try to configure kryo serialization according to this

(defn make-spark-context []
  (let [c (-> (conf/spark-conf)
              (conf/master "local[*]")
              (conf/app-name "clojure job")
              (conf/set "spark.driver.allowMultipleContexts" "false")
              (conf/set "spark.executor.memory" "4g")
              (conf/set "spark.serializer" "org.apache.spark.serializer.KryoSerializer")
              (conf/set "spark.kryo.registrator" "org.nd4j.Nd4jRegistrator"))]
    (spark/spark-context c)))

I create SQL-dataset and then call

(sql/show data)

And then I have this error

ERROR executor.Executor:91 - Exception in task 0.0 in stage 5.0 (TID 216)
java.lang.UnsupportedOperationException
	at clojure.lang.APersistentVector.add(APersistentVector.java:372)
	at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:134)
	at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:40)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
	at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229)
	at org.apache.spark.serializer.DeserializationStream.readKey(Serializer.scala:157)
	at org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:189)
	at org.apache.spark.serializer.DeserializationStream$$anon$2.getNext(Serializer.scala:186)
	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
	at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:154)
	at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:41)
	at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:91)
	at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
	at org.apache.spark.scheduler.Task.run(Task.scala:86)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

UnsupportedOperationException   clojure.lang.APersistentVector.add (APersistentVector.java:372)

How can I fix this?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.