Giter Club home page Giter Club logo

spark-testing-base's Introduction

build status

spark-testing-base

Base classes to use when writing tests with Spark.

Why?

You've written an awesome program in Spark and now its time to write some tests. Only you find yourself writing the code to setup and tear down local mode Spark in between each suite and you say to your self: This is not my beautiful code.

How?

So you include com.holdenkarau.spark-testing-base [spark_version]_1.4.0 and extend one of the classes and write some simple tests instead. For example to include this in a project using Spark 3.0.0:

"com.holdenkarau" %% "spark-testing-base" % "3.0.0_1.4.0" % "test"

or

<dependency>
	<groupId>com.holdenkarau</groupId>
	<artifactId>spark-testing-base_2.12</artifactId>
	<version>${spark.version}_1.4.0</version>
	<scope>test</scope>
</dependency>

How to use it inside your code? have a look at the wiki page.

The Maven repositories page for spark-testing-base lists the releases available.

The Python package of spark-testing-base is available via:

Minimum Memory Requirements and OOMs

The default SBT testing java options are too small to support running many of the tests due to the need to launch Spark in local mode. To increase the amount of memory in a build.sbt file you can add:

fork in Test := true
javaOptions ++= Seq("-Xms8G", "-Xmx8G", "-XX:MaxPermSize=4048M", "-XX:+CMSClassUnloadingEnabled")

Note: if your running in JDK17+ PermSize and ClassnloadingEnabled have been removed so it becomes:

fork in Test := true
javaOptions ++= Seq("-Xms8G", "-Xmx8G"),

If using surefire you can add:

<argLine>-Xmx2048m -XX:MaxPermSize=2048m</argLine>

Note: the specific memory values are examples only (and the values used to run spark-testing-base's own tests).

Special considerations

Make sure to disable parallel execution.

In sbt you can add:

parallelExecution in Test := false

In surefire make sure that forkCount is set to 1 and reuseForks is true.

If your testing Spark SQL CodeGen make sure to set SPARK_TESTING=true

Codegen tests and Running Spark Testing Base's own tests

If you are testing codegen it's important to have SPARK_TESTING set to yes, as we do in our github actions.

SPARK_TESTING=yes ./build/sbt clean +compile +test -DsparkVersion=$SPARK_VERSION

Where is this from?

Some of this code is a stripped down version of the test suite bases that are in Apache Spark but are not accessible. Other parts are also inspired by sscheck (scalacheck generators for Spark).

Other parts of this are implemented on top of the test suite bases to make your life even easier.

How do I build this?

This project is built with sbt.

What are some other options?

While we hope you choose our library, https://github.com/juanrh/sscheck , https://github.com/hammerlab/spark-tests , https://github.com/wdm0006/DummyRDD , and more https://www.google.com/search?q=python+spark+testing+libraries exist as options.

Security Disclosure e-mails

Have you found a security concern? Please let us know

See https://github.com/holdenk/spark-testing-base/blob/main/SECURITY.md

spark-testing-base's People

Contributors

ac27182 avatar bryanyang0528 avatar chiefmanc avatar dbast avatar eruizalo avatar hgiddens avatar holdenk avatar jerrypnz avatar joshrosen avatar juanrh avatar kaatzee avatar limansky avatar mahmoudhanafy avatar mandoz avatar markdessain avatar morazow avatar mrpowers avatar nightscape avatar pchundi avatar ponkin avatar rgarciate avatar rylanhalteman avatar sathiyapk avatar scala-steward avatar sgt avatar siklosid avatar smadarasmi avatar smiklos avatar wosin avatar zouzias avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

spark-testing-base's Issues

multisets dependency locked to _2.10 version

In build.sbt, please replace the line

"io.github.nicolasstucki" % "multisets_2.10" % "0.1",

with

"io.github.nicolasstucki" %% "multisets" % "0.3",

Otherwise, useSets support is broken on Scala 2.11.

Errors when running several test suites from sbt with parallelExecution := true

Hi,

When I run all the tests from a project with sbt test and I have more than one test suite involving Spark, then I get execution errors. This can be reproduced for example running sbt test with https://github.com/juanrh/spark-testing-base, and commenting parallelExecution := false in build.sbt. I guess this has to do with https://issues.apache.org/jira/browse/SPARK-2243, and it looks like a non trivial problem with Spark. I tried to use a trait and companion StaticSparkContext to use just a single Spark Context for the whole JVM, but then we wouldn't be able to use different clock configuration for different test suites, as there would be just a single SparkConf for the whole JVM.

So for now using parallelExecution := false in build.sbt is a workaround, but I though it would be nice to open an issue, in case https://issues.apache.org/jira/browse/SPARK-2243 is solved in the future, and then concurrent execution of Spark test suites is possible. Anyway I would like to hear your thoughts about this.

Thanks again for this nice spark package.

Greetings,

Juan

Serialization for Mockito improvement

One single line can save some serialization pain when using Mockito. Perhaps it makes sense to create a mixin for StreamingSuiteCommon with MockitoSugar and this:

override def mock[T <: AnyRef](implicit manifest: Manifest[T]): T = super.mock[T](withSettings().serializable())

Otherwise you get following error:

Caused by: java.io.NotSerializableException: org.mockito.internal.creation.DelegatingMethod
Serialization stack:
    - object not serializable (class: org.mockito.internal.creation.DelegatingMethod, value: org.mockito.internal.creation.DelegatingMethod@a97f2bff)
    - field (class: org.mockito.internal.invocation.InvocationImpl, name: method, type: interface org.mockito.internal.invocation.MockitoMethod)
...

Spark Streaming and KafkaUtils

Great project! Thanks.
Is there any plans to move KafkaUtils for tests(spark streaming)?
P.S. we actually did it already, and I can contribute it if you tell me how.
Thanks in advance!

Test case for DStream.countByWindow

Hi,
I am trying to write a test case for DStream,countByWindow using StreamingSuiteBase.

test("CountByWindow with windowDuration 3s and slideDuration=2s") {
     // There should be 2 windows :  {batch2,batch1},  {batch4,batch3,batch2}
     val batch1 = List("a","b")
     val batch2 = List("d","f","a")
     val batch3 = List("f","g","h")
     val batch4 = List("a")
     val input= List(batch1,batch2,batch3,batch4)
      val expected = List( List(5L), List(7L))
     def countByWindow(ds:DStream[String]):DStream[Long] = {
        ds.countByWindow(windowDuration = Seconds(3), slideDuration = Seconds(2))
     }
     testOperation[String,Long](input,countByWindow _ ,expected)
}

But I get a timed out exception when I run it.

Testing started at 15:31 ...
16/02/06 15:31:44 INFO [ScalaTest-run] WindowTest: Using manual clock
16/02/06 15:31:46 INFO [ScalaTest-run-running-WindowTest] WindowTest: numBatches = 2, numExpectedOutput = 2
16/02/06 15:31:46 INFO [ScalaTest-run-running-WindowTest] WindowTest: Manual clock before advancing = 0
16/02/06 15:31:46 INFO [ScalaTest-run-running-WindowTest] WindowTest: Manual clock after advancing = 2000
16/02/06 15:31:46 INFO [ScalaTest-run-running-WindowTest] WindowTest: output.size = 0, numExpectedOutput = 2
16/02/06 15:31:46 INFO [ScalaTest-run-running-WindowTest] WindowTest: output.size = 0, numExpectedOutput = 2
....
16/02/06 15:31:56 INFO [ScalaTest-run-running-WindowTest] WindowTest: output.size = 1, numExpectedOutput = 2
16/02/06 15:31:56 INFO [ScalaTest-run-running-WindowTest] WindowTest: Output generated in 10022 milliseconds
16/02/06 15:31:56 INFO [ScalaTest-run-running-WindowTest] WindowTest: [5]

assertion failed: Operation timed out after 10022 ms
java.lang.AssertionError: assertion failed: Operation timed out after 10022 ms
    at scala.Predef$.assert(Predef.scala:179)


Is my set up correct ? (I can not find an example on testing DStream.countByWindow in the test cases.)

Thanks in advance for your assistance !
Shing

Support for spark 1.4.1

Great library, thanks for all the hard work!

Looks like Spark is up to 1.4.x any chance of a release that supports it?

Add DataFrame generator for ScalaTest

Similar to how we have an RDDGenerator we should add DataFrame generators for use with ScalaTest.

As part of this, it would likely be useful to have a function which takes a DataFrame schema and turns it into a basic set of generators the DataFrame generator could use and then the user could customize it somewhat (for example if their DataFrame had Strings and Integers taking in the schema and producing a Map of column name to a string generator and integer generator) and then if the user wanted to add some constraints (e.g. non-empty strings) they could do that rather than having to specify everything. This could also be done as a separate follow up issue however.

code coverage reporting is broken for Scala/Java code

Code coverage reporting is borked for Scala/Java - take a look and try and figure out why (possibly fixing the Java tests caused this issue). We should either:

  • fix this
  • disable code coverage reporting until fixed

Don't inherit from FunSuite in the StreamingSuite helpers.

Doing so prevents them being mixed in the FunSpec format of using scalatest suites.
My team has chosen FunSpec over FunSuite.

You can probably get away with inheriting from Suite or simply specifying a self type annotation on the 2 Streaming traits, just like you're doing on SharedSparkContext.

For your own tests (e.g. SampleStreamingActionTest), you can change them to inherit from FunSuite.

I tested this in a local branch, the changes seem minimum, I can send over a PR if that's easier.

Defining Equality object used in StreamingSuiteBase.verifyOutput

I wonder, whether it would be possible to pass my own Equality object that is used in StreamingSuiteBase for triple-equals comparison.
If not, could you please provide an example on how to testOperation, when some fields of expectedOutput are unknown/unimportant(e.g. time-dependent).

ValueError: please use 'pytest' pypi package instead of 'py.test'

Hi,

I have tried installing the testing framework locally with Python version 2.7.

pchundi@uvbox:~/IdeaProjects/spark-testing-base/python$ sudo python setup.py install
/usr/lib/python2.7/distutils/dist.py:267: UserWarning: Unknown distribution option: 'test_requires'
  warnings.warn(msg)
running install
running bdist_egg
running egg_info
writing requirements to sparktestingbase.egg-info/requires.txt
writing sparktestingbase.egg-info/PKG-INFO
writing top-level names to sparktestingbase.egg-info/top_level.txt
writing dependency_links to sparktestingbase.egg-info/dependency_links.txt
reading manifest file 'sparktestingbase.egg-info/SOURCES.txt'
writing manifest file 'sparktestingbase.egg-info/SOURCES.txt'
installing library code to build/bdist.linux-x86_64/egg
running install_lib
running build_py
creating build/bdist.linux-x86_64/egg
creating build/bdist.linux-x86_64/egg/sparktestingbase
copying build/lib.linux-x86_64-2.7/sparktestingbase/streamingtestcase.py -> build/bdist.linux-x86_64/egg/sparktestingbase
copying build/lib.linux-x86_64-2.7/sparktestingbase/__init__.py -> build/bdist.linux-x86_64/egg/sparktestingbase
creating build/bdist.linux-x86_64/egg/sparktestingbase/test
copying build/lib.linux-x86_64-2.7/sparktestingbase/test/reuse_spark_context_test.py -> build/bdist.linux-x86_64/egg/sparktestingbase/test
copying build/lib.linux-x86_64-2.7/sparktestingbase/test/helloworld_test.py -> build/bdist.linux-x86_64/egg/sparktestingbase/test
copying build/lib.linux-x86_64-2.7/sparktestingbase/test/simple_streaming_test.py -> build/bdist.linux-x86_64/egg/sparktestingbase/test
copying build/lib.linux-x86_64-2.7/sparktestingbase/test/__init__.py -> build/bdist.linux-x86_64/egg/sparktestingbase/test
copying build/lib.linux-x86_64-2.7/sparktestingbase/test/simple_test.py -> build/bdist.linux-x86_64/egg/sparktestingbase/test
copying build/lib.linux-x86_64-2.7/sparktestingbase/testcase.py -> build/bdist.linux-x86_64/egg/sparktestingbase
copying build/lib.linux-x86_64-2.7/sparktestingbase/utils.py -> build/bdist.linux-x86_64/egg/sparktestingbase
byte-compiling build/bdist.linux-x86_64/egg/sparktestingbase/streamingtestcase.py to streamingtestcase.pyc
byte-compiling build/bdist.linux-x86_64/egg/sparktestingbase/__init__.py to __init__.pyc
byte-compiling build/bdist.linux-x86_64/egg/sparktestingbase/test/reuse_spark_context_test.py to reuse_spark_context_test.pyc
byte-compiling build/bdist.linux-x86_64/egg/sparktestingbase/test/helloworld_test.py to helloworld_test.pyc
byte-compiling build/bdist.linux-x86_64/egg/sparktestingbase/test/simple_streaming_test.py to simple_streaming_test.pyc
byte-compiling build/bdist.linux-x86_64/egg/sparktestingbase/test/__init__.py to __init__.pyc
byte-compiling build/bdist.linux-x86_64/egg/sparktestingbase/test/simple_test.py to simple_test.pyc
byte-compiling build/bdist.linux-x86_64/egg/sparktestingbase/testcase.py to testcase.pyc
byte-compiling build/bdist.linux-x86_64/egg/sparktestingbase/utils.py to utils.pyc
creating build/bdist.linux-x86_64/egg/EGG-INFO
copying sparktestingbase.egg-info/PKG-INFO -> build/bdist.linux-x86_64/egg/EGG-INFO
copying sparktestingbase.egg-info/SOURCES.txt -> build/bdist.linux-x86_64/egg/EGG-INFO
copying sparktestingbase.egg-info/dependency_links.txt -> build/bdist.linux-x86_64/egg/EGG-INFO
copying sparktestingbase.egg-info/requires.txt -> build/bdist.linux-x86_64/egg/EGG-INFO
copying sparktestingbase.egg-info/top_level.txt -> build/bdist.linux-x86_64/egg/EGG-INFO
zip_safe flag not set; analyzing archive contents...
creating 'dist/sparktestingbase-0.0.7_snapshot-py2.7.egg' and adding 'build/bdist.linux-x86_64/egg' to it
removing 'build/bdist.linux-x86_64/egg' (and everything under it)
Processing sparktestingbase-0.0.7_snapshot-py2.7.egg
Removing /usr/local/lib/python2.7/dist-packages/sparktestingbase-0.0.7_snapshot-py2.7.egg
Copying sparktestingbase-0.0.7_snapshot-py2.7.egg to /usr/local/lib/python2.7/dist-packages
sparktestingbase 0.0.7-snapshot is already the active version in easy-install.pth

Installed /usr/local/lib/python2.7/dist-packages/sparktestingbase-0.0.7_snapshot-py2.7.egg
Processing dependencies for sparktestingbase==0.0.7-snapshot
Searching for py.test
Reading https://pypi.python.org/simple/py.test/
Best match: py.test 0.0
Downloading https://pypi.python.org/packages/source/p/py.test/py.test-0.0.tar.gz#md5=9a5bcfc8ad06500ddd3a0fe3aa7965f6
Processing py.test-0.0.tar.gz
Writing /tmp/easy_install-z5O08t/py.test-0.0/setup.cfg
Running py.test-0.0/setup.py -q bdist_egg --dist-dir /tmp/easy_install-z5O08t/py.test-0.0/egg-dist-tmp-dOfqou
Traceback (most recent call last):
  File "setup.py", line 22, in <module>
    'unittest2'
  File "/usr/lib/python2.7/distutils/core.py", line 151, in setup
    dist.run_commands()
  File "/usr/lib/python2.7/distutils/dist.py", line 953, in run_commands
    self.run_command(cmd)
  File "/usr/lib/python2.7/distutils/dist.py", line 972, in run_command
    cmd_obj.run()
  File "/usr/lib/python2.7/dist-packages/setuptools/command/install.py", line 73, in run
    self.do_egg_install()
  File "/usr/lib/python2.7/dist-packages/setuptools/command/install.py", line 96, in do_egg_install
    cmd.run()
  File "/usr/lib/python2.7/dist-packages/setuptools/command/easy_install.py", line 381, in run
    self.easy_install(spec, not self.no_deps)
  File "/usr/lib/python2.7/dist-packages/setuptools/command/easy_install.py", line 597, in easy_install
    return self.install_item(None, spec, tmpdir, deps, True)
  File "/usr/lib/python2.7/dist-packages/setuptools/command/easy_install.py", line 648, in install_item
    self.process_distribution(spec, dist, deps)
  File "/usr/lib/python2.7/dist-packages/setuptools/command/easy_install.py", line 694, in process_distribution
    [requirement], self.local_index, self.easy_install
  File "/usr/lib/python2.7/dist-packages/pkg_resources.py", line 620, in resolve
    dist = best[req.key] = env.best_match(req, ws, installer)
  File "/usr/lib/python2.7/dist-packages/pkg_resources.py", line 858, in best_match
    return self.obtain(req, installer) # try and download/install
  File "/usr/lib/python2.7/dist-packages/pkg_resources.py", line 870, in obtain
    return installer(requirement)
  File "/usr/lib/python2.7/dist-packages/setuptools/command/easy_install.py", line 616, in easy_install
    return self.install_item(spec, dist.location, tmpdir, deps)
  File "/usr/lib/python2.7/dist-packages/setuptools/command/easy_install.py", line 646, in install_item
    dists = self.install_eggs(spec, download, tmpdir)
  File "/usr/lib/python2.7/dist-packages/setuptools/command/easy_install.py", line 834, in install_eggs
    return self.build_and_install(setup_script, setup_base)
  File "/usr/lib/python2.7/dist-packages/setuptools/command/easy_install.py", line 1040, in build_and_install
    self.run_setup(setup_script, setup_base, args)
  File "/usr/lib/python2.7/dist-packages/setuptools/command/easy_install.py", line 1025, in run_setup
    run_setup(setup_script, args)
  File "/usr/lib/python2.7/dist-packages/setuptools/sandbox.py", line 50, in run_setup
    lambda: execfile(
  File "/usr/lib/python2.7/dist-packages/setuptools/sandbox.py", line 100, in run
    return func()
  File "/usr/lib/python2.7/dist-packages/setuptools/sandbox.py", line 52, in <lambda>
    {'__file__':setup_script, '__name__':'__main__'}
  File "setup.py", line 6, in <module>
    author='Holden Karau',
ValueError: please use 'pytest' pypi package instead of 'py.test'

Should I install any dependencies before installing the framework?

Multiple Tests Per Test Class -> NPE

Thanks for this library! It will be great to have squeaky-clean tests for our streaming apps. The team here loves the "This is not my beautiful code" reference too.

I'm trying out 1.3.0_0.0.5 this morning and it works great for a single test. With multiple tests in a single class, the first tests passes while second & later tests fail like this. Is this expected?

java.lang.NullPointerException
    at org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:177)
    at org.apache.spark.streaming.TestStreamingContext.<init>(TestStreamingContext.scala:24)
    at org.apache.spark.streaming.TestStreamingContext.<init>(TestStreamingContext.scala:35)
    at com.holdenkarau.spark.testing.StreamingSuiteBase$class.setupStreams(StreamingSuiteBase.scala:165)
    at com.bigco.spark.BotDetectorSpec.setupStreams(BotDetectorSpec.scala:13)
    at com.holdenkarau.spark.testing.StreamingSuiteBase$class.testOperation(StreamingSuiteBase.scala:321)
    at com.bigco.spark.BotDetectorSpec.testOperation(BotDetectorSpec.scala:13)
    at com.holdenkarau.spark.testing.StreamingSuiteBase$class.testOperation(StreamingSuiteBase.scala:300)
    at com.bigco.spark.BotDetectorSpec.testOperation(BotDetectorSpec.scala:13)
    at com.bigco.spark.BotDetectorSpec$$anonfun$2.apply$mcV$sp(BotDetectorSpec.scala:53)

Got java.io.NotSerializableException when trying to get sample from RDD

When trying to run this code:

  test("assert that two methods on the RDD have the same results") {
    val stringRDDGenerator = RDDGenerator.genRDD[String](sc)
    println("Sample Count: " + stringRDDGenerator.sample.get.isEmpty())

    forAll(stringRDDGenerator){
      rdd => RDDComparisons.compare(filterOne(rdd),
        filterOther(rdd)).isEmpty
    }
  }

I got This exception:


Job aborted due to stage failure: Failed to serialize task 0, not attempting to retry it. Exception during serialization: java.io.NotSerializableException: org.scalacheck.Gen$$anon$6
Serialization stack:
    - object not serializable (class: org.scalacheck.Gen$$anon$6, value: org.scalacheck.Gen$$anon$6@61225a52)
    - field (class: com.holdenkarau.spark.testing.WrappedGenerator, name: generator, type: interface org.scalacheck.Gen)
    - object (class com.holdenkarau.spark.testing.WrappedGenerator, com.holdenkarau.spark.testing.WrappedGenerator@3e776e3d)
    - field (class: org.apache.spark.mllib.rdd.RandomRDDPartition, name: generator, type: interface org.apache.spark.mllib.random.RandomDataGenerator)
    - object (class org.apache.spark.mllib.rdd.RandomRDDPartition, org.apache.spark.mllib.rdd.RandomRDDPartition@0)
    - field (class: org.apache.spark.scheduler.ResultTask, name: partition, type: interface org.apache.spark.Partition)
    - object (class org.apache.spark.scheduler.ResultTask, ResultTask(0, 0))

But if I didn't get that sample it works fine !!

So I'm wondering how property check works fine and it never throws that Exception !!

Nice to have more types in DataFrameSuiteBaseLike.approxEquals

At present the following test involving an Array will fail.

val arr1 = Array("a","b")
val arr2 = Array("a","b")
val r1 = Row(arr1)
val r2 = Row(arr2)

assert(DataFrameSuiteBase.approxEquals(r1,r2, 0.0))

This is because the comparsion would use arr1 != arr2 , which is true.
The comparison should use !arr1.sameElements(arr2)

Maybe DataFrameSuiteBaseLike.approxEquals should also cater for Array[T], where T is String, Float, Long, Double.

Shing

typo in sbt dependency

Hi Holden,
I think the sbt dependency string in the documentation is incorrect? i.e.

"com.holdenkarau" % "spark-testing-base" %% "1.3.0_0.2.0"

What worked for me was:

"com.holdenkarau" %% "spark-testing-base" % "1.3.0_0.2.0"

README Maven Dependecy Wrong

The readme's Maven dependency example is wrong. The Scala version needs to be appended to the artifactId. That section should be:

    <dependency>
        <groupId>com.holdenkarau</groupId>
        <artifactId>spark-testing-base_2.10</artifactId>
        <version>${spark.version}_0.3.2</version>
        <scope>test</scope>
    </dependency>

Why useSet is needed in testOperation?

Hi,

Thanks a lot for this package, I find it very interesting.

The thing is I've been taking a look at StreamingSuiteBase.testOperation, and I don't understand how useSet works. I understand the input and output are specified as a Seq of Seq, corresponding to a series of RDDs for a prefix of a DStream. Hence each internal Seq is a micro batch. From what I see at verifyOutput I understand that each when useSet is true then each micro batch is treated like a set, and when it is false then each micro batch is treated like a list. But I understand an RDD is a distributed implementation of a multiset, so I don't get how any of these two interpretations could make sense. Maybe I'm missing something?

Thanks a lot in advance,

Greetings,

Juan

enable spark for 1.2

Hi holdenk,

Unfortunately I need to test against spark 1.2, Please re-enable cross-builds for this older version.
Appreciate your help

DataFrameSuiteBase not working

I tried to extend the scala test with DataFrameSuiteBase, the sqlContext cannot be resolved to SQLContext because it has type Any. Does anyone have the same issue?

equalDataFrames should not call DataFrameSuiteBaseLike.approxEquals directly.

Hi,
I am using
"com.holdenkarau" %% "spark-testing-base" % "1.5.2_0.3.0"

In the trait DataFrameSuiteBaseLike , there is the method
def approxEquals(r1: Row, r2: Row, tol: Double): Boolean = {
DataFrameSuiteBase.approxEquals(r1, r2, tol)
}

https://github.com/holdenk/spark-testing-base/blob/master/src/main/1.3/scala/com/holdenkarau/spark/testing/DataFrameSuiteBase.scala

The methods DataFrameSuiteBaseLike.equalDataFrames and
approxEqualDataFrames call DataFrameSuiteBase.approxEquals(r1, r2, tol) directly.

Shouldn't DataFrameSuiteBaseLike.equalDataFrames and
approxEqualDataFrames calls DataFrameSuiteBaseLike.approxEquals ?

As it is, overriding
DataFrameSuiteBaseLike.approxEquals has no effect.

Shing

some tests got oom error

hi,holdenk
When i tried to run test follow the wiki "DataFrameSuiteBase"(https://github.com/holdenk/spark-testing-base/wiki/DataFrameSuiteBase), i got one OOM error, but other test (DataFrameGenerator) is ok.
there is my environment:
os: centos 6.4
machine memory: 8G
jdk: 1.7.0_79
maven: apache-maven-3.3.9
MAVEN_OPTS:-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m
spark: spark-1.6.0

and this is the error:
An exception or error caused a run to abort. This may have been caused by a problematic custom reporter.

Exception in thread "ScalaTest-main" 
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "ScalaTest-main"

thanks

Publish Scala 2.11 Builds to Maven Repo

Seems like it would be consistent with spark-core to publish both 2.10 and 2.11 builds.

Obviously this is not a very high priority, as one can just build their own. We do :)

exclude hypothesis in setup.py

hypothesis has a bug with easy_install, see here. Since it's not actually being used in the code yet, including it in setup.py will break projects that have "spark-testing-base" in tests_require in their setup.py.

how to test foreachRDD in python

how would one go about testing functions written for foreachRDD in python?

for output operation, i would like to write to kafka.

I'm thinking in order to do this, i should either write to kafka or a mock kafka operation, which can hold output written from the foreachRDD function. Also, in spark-testing-base, there should be a "run_func" like method to test functions used in foreachRDD.

Compilation error with RddGenerator

Hi,

I'm playing with RddGenerator and I'm having hard time to figure out how to fix a compilation error.
My test case is very simple, just an extract from "SampleScalaCheckTest"

import com.holdenkarau.spark.testing.{RDDGenerator, SharedSparkContext}
import org.scalacheck.Arbitrary
import org.scalacheck.Prop._
import org.scalatest.FunSuite
import org.scalatest.prop.Checkers

class RddGeneratorTest extends FunSuite with SharedSparkContext with Checkers {

  test("map should not change number of elements") {
    val property =
      forAll(RDDGenerator.genRDD[String](sc)(Arbitrary.arbitrary[String])) {
        rdd => rdd.map(_.length).count() == rdd.count()
      }

    check(property)
  }
}

and here is my compilation error:

Error:(13, 45) not enough arguments for method genRDD: (implicit evidence$1: scala.reflect.ClassTag[String], implicit a: org.scalacheck.Arbitrary[String])org.scalacheck.Gen[org.apache.spark.rdd.RDD[String]].
Unspecified value parameter a.
forAll(RDDGenerator.genRDDString(Arbitrary.arbitrary[String])) {
^

I run the test in IntelliJ, but SampleScalaCheckTest test works fine when also run in IntelliJ from the cloned project.

The only difference I can think is I'm using maven to setup my project.
Here is my pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.github.rbrugier</groupId>
    <artifactId>spark-testing-example</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <scala.version>2.10.4</scala.version>
        <scala.dep.version>2.10</scala.dep.version>
        <spark.version>1.6.0</spark.version>
        <maven-scala-plugin.version>2.15.2</maven-scala-plugin.version>
        <maven.compiler.plugin>3.0</maven.compiler.plugin>
    </properties>

    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <finalName>${project.artifactId}</finalName>

        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <scalaVersion>${scala.version}</scalaVersion>
                    <scalaCompatVersion>${scala.dep.version}</scalaCompatVersion>
                    <recompileMode>incremental</recompileMode>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>${maven.compiler.plugin}</version>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.dep.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <!--spark-testing has a dependency to spark-sql, spark-hive, spark-mllib -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.dep.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_${scala.dep.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_${scala.dep.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.scalatest</groupId>
            <artifactId>scalatest_${scala.dep.version}</artifactId>
            <version>2.2.6</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.holdenkarau</groupId>
            <artifactId>spark-testing-base_${scala.dep.version}</artifactId>
            <version>${spark.version}_0.3.1</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
</project>

Am i missing something to import in the context?

Thanks in advance for the help!

including spark-testing-base gives me "No FileSystem for scheme: s3n" at DataFrameReader.load

Hi Holden,
This could be a sbt newbie question and just let me know if it is(and I'll research more):

When I add a dependency to spark-testing-base in my sbt file, previously working code interfacing with the spark-redshift package, specifically DataFrameReader.load code fails with the following error:

Exception in thread "main" java.io.IOException: No FileSystem for scheme: s3n
    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
    at com.databricks.spark.redshift.Utils$.assertThatFileSystemIsNotS3BlockFileSystem(Utils.scala:124)
    at com.databricks.spark.redshift.RedshiftRelation.<init>(RedshiftRelation.scala:48)
    at com.databricks.spark.redshift.DefaultSource.createRelation(DefaultSource.scala:49)
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:125)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114)
..

The calling code is:

sqlContext.read
      .format("com.databricks.spark.redshift")
      .option("url", redshift_url)
      .option("query", query)
      .option("tempdir", "s3n://i-love-unit-testing/")
      .load() // <- fails here.

My spark related sbt additions are:

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.5.2" % "provided"
libraryDependencies += "org.apache.spark" %% "spark-mllib" % "1.5.2" % "provided"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.5.2" % "provided"
libraryDependencies += "com.holdenkarau" %% "spark-testing-base" % "1.5.1_0.2.1" % "provided"
libraryDependencies += "com.databricks" %% "spark-csv" % "latest.release"
libraryDependencies += "com.databricks" %% "spark-redshift" % "latest.release"

I tried without provided and it did not work as well.

Streaming Context is stopped prematurely, which prevents sequence of batches to be executed fully, visible on slower machines

Hi,

I am struggling with running SparkStreaming tests on slower boxes, and apparently the streaming context is being closed to soon, (before every batch could be processed), my code:

class StreamToFileDumperSparkTest  extends StreamingActionBase {
  test("smoke integration test for dumper transformation") {
     val input = List(List(data1), List(data2), List(data3))
     runAction(input, (s: DStream[String]) => s.foreachRDD(/*save some files*/))
     // here I do waiting for files to be available
     val result = sc.textFile(some_path).collect().sorted
     result should equal(inputData)
  }
}

Exception that shows my suspicion about streaming context be closed to soon is following:

15/11/17 12:02:24 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[15] at foreachRDD at StreamToFileDumper.scala:57), which is now runnable
Exception in thread "streaming-job-executor-0" java.lang.Error: java.lang.InterruptedException
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1151)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.InterruptedException
    at java.lang.Object.wait(Native Method)
    at java.lang.Object.wait(Object.java:503)
    at org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:559)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1835)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1912)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1124)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1065)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1065)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
    at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1065)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:989)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:965)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:965)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
    at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:965)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$3.apply$mcV$sp(PairRDDFunctions.scala:951)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$3.apply(PairRDDFunctions.scala:951)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$3.apply(PairRDDFunctions.scala:951)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
    at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:950)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$2.apply$mcV$sp(PairRDDFunctions.scala:909)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$2.apply(PairRDDFunctions.scala:907)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$2.apply(PairRDDFunctions.scala:907)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
    at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:907)
    at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$2.apply$mcV$sp(RDD.scala:1444)
    at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$2.apply(RDD.scala:1432)
    at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$2.apply(RDD.scala:1432)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
    at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1432)
    at com.nokia.ph.kinesis2s3.spark.StreamToFileDumper.save(StreamToFileDumper.scala:45)
    at com.nokia.ph.kinesis2s3.spark.StreamToFileDumper$$anonfun$process$1.apply(StreamToFileDumper.scala:57)
    at com.nokia.ph.kinesis2s3.spark.StreamToFileDumper$$anonfun$process$1.apply(StreamToFileDumper.scala:57)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:42)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:40)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:399)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:40)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
    at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
    at scala.util.Try$.apply(Try.scala:161)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:34)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:218)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:218)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:218)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:217)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    ... 2 more
15/11/17 12:02:24 INFO JobScheduler: Stopped JobScheduler

Testing Spark Actions

Any thoughts about testing an action? Because an action wouldn't return anything, it doesn't exactly fit the current SparkTestingBase.

It would still be useful to be able to use the framework for setup/execution/teardown to be able to test code like this, using mocks to verify expectations:

  def doAction(kafkaStream: DStream[(String, BaseEvent)]) {
    kafkaStream.foreachRDD(rdd => {
      rdd.foreachPartition(partition => {
        // code under test
      })
    })
  }

Point Out Spark 1.6's Hive Dependency

When using Spark 1.6 there is a runtime dependency on Spark Hive. There should be some mention of this in the readme.

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-hive_2.10</artifactId>
        <version>${spark.version}</version>
        <scope>test</scope>
    </dependency>

Depending on someone's POM, the scope of test should be removed.

provide reset to localRootDirs per each test

Hi,

  1. thanks for the project!
  2. despite the fact that I'm writing tests in java with junit(so I haven't succeeded to use LocalSparkContext trait), I think there is some scenario which can be improved:

suppose I have several tests in test suite, and suppose I have @BeforeEach & @AfterEach which creates and deletes temp dir(so that each test will have separate env, separate local dirs etc).
I've tried to set spark.local.dir with something like sparkConf.set("spark.local.dir", tempDir+"/spark");
it works, but only for 1'st test, probably due to how Utils.getOrCreateLocalRootDirs method is implemented, it has specific comment that it caches local dir.
so there is another method in Utils

/** Used by unit tests. Do not call from other places. */
  private[spark] def clearLocalRootDirs(): Unit = {
    localRootDirs = null
  }

which can clean local root dir, however it's private, so my test can't use it.
might be you want to add call to clearLocalRootDirs? into your LocalSparkContext stop method?
anyway would like to hear what is the best way to approach this problem? might be I need to open jira in spark repo to open clearLocalRootDirs to public access?

ps: I needed to investigate it because I saw that there are directories that aren't cleaned up after tests(because 2nd in suite test reuses local dir that was deleted at the end of 1'st test and thus recreating the dir)

More documentation!

Hi!

Can we get some usage documentation? I'm having the scour the web here. 😃

Happy to help, of course.

Run Streaming Tests Distributively

Currently we are only supporting input and output as Lists. We need to add support to General InputStreams and run the tests distributively.

RDDComparisons Assert Doesn't Fail

I'm using spark-testing-base with Java. I'm using the RDDComparisons to compare the outputs of two RDDs. Here is the code:

        List<String> input = Arrays.asList("1\tHeart", "2\tDiamonds");
        JavaRDD<String> inputRDD = jsc().parallelize(input);
        JavaPairRDD<String, Integer> result = RefactoredForTests.runETL(inputRDD);

        List<Tuple2<String, Integer>> expectedInput = Arrays.asList(
                new Tuple2<String, Integer>("Heart", 1),
                new Tuple2<String, Integer>("Diamonds", 12));
        JavaPairRDD<String, Integer> expectedRDD = jsc().parallelizePairs(expectedInput);

        ClassTag<Tuple2<String, Integer>> tag = scala.reflect.ClassTag$.MODULE$.apply(Tuple2.class);

        // Doesn't assert failure
        RDDComparisons.compareWithOrder(JavaPairRDD.toRDD(result),
                JavaPairRDD.toRDD(expectedRDD), tag);

        List<Tuple2<String, Integer>> expectedList = expectedRDD.collect();
        List<Tuple2<String, Integer>> actualList = result.collect();

        // Does fail
        for (int i = 0; i < expectedList.size(); i++) {
            assertEquals(expectedList.get(i), actualList.get(i));
        }

I've manually changed the expected output to fail (should be 2 but I have a 12). Both the compareWithOrder and compare functions don't fail during the check. However, the manual check that I wrote does fail correctly.

Is there something extra to the compare?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.