Giter Club home page Giter Club logo

spark-sklearn's Introduction

Deprecation

This project is deprecated. We now recommend using scikit-learn and Joblib Apache Spark Backend to distribute scikit-learn hyperparameter tuning tasks on a Spark cluster:

You need pyspark>=2.4.4 and scikit-learn>=0.21 to use Joblib Apache Spark Backend, which can be installed using pip:

pip install joblibspark

The following example shows how to distributed GridSearchCV on a Spark cluster using joblibspark. Same applies to RandomizedSearchCV.

from sklearn import svm, datasets
from sklearn.model_selection import GridSearchCV
from joblibspark import register_spark
from sklearn.utils import parallel_backend

register_spark() # register spark backend

iris = datasets.load_iris()
parameters = {'kernel':('linear', 'rbf'), 'C':[1, 10]}
svr = svm.SVC(gamma='auto')

clf = GridSearchCV(svr, parameters, cv=5)

with parallel_backend('spark', n_jobs=3):
    clf.fit(iris.data, iris.target)

Scikit-learn integration package for Apache Spark

This package contains some tools to integrate the Spark computing framework with the popular scikit-learn machine library. Among other things, it can:

  • train and evaluate multiple scikit-learn models in parallel. It is a distributed analog to the multicore implementation included by default in scikit-learn
  • convert Spark's Dataframes seamlessly into numpy ndarray or sparse matrices
  • (experimental) distribute Scipy's sparse matrices as a dataset of sparse vectors

It focuses on problems that have a small amount of data and that can be run in parallel. For small datasets, it distributes the search for estimator parameters (GridSearchCV in scikit-learn), using Spark. For datasets that do not fit in memory, we recommend using the distributed implementation in `Spark MLlib <https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html>_.

This package distributes simple tasks like grid-search cross-validation. It does not distribute individual learning algorithms (unlike Spark MLlib).

Installation

This package is available on PYPI:

pip install spark-sklearn

This project is also available as Spark package.

The developer version has the following requirements:

  • scikit-learn 0.18 or 0.19. Later versions may work, but tests currently are incompatible with 0.20.
  • Spark >= 2.1.1. Spark may be downloaded from the Spark website. In order to use this package, you need to use the pyspark interpreter or another Spark-compliant python interpreter. See the Spark guide for more details.
  • nose (testing dependency only)
  • pandas, if using the pandas integration or testing. pandas==0.18 has been tested.

If you want to use a developer version, you just need to make sure the python/ subdirectory is in the PYTHONPATH when launching the pyspark interpreter:

PYTHONPATH=$PYTHONPATH:./python:$SPARK_HOME/bin/pyspark

You can directly run tests:

cd python && ./run-tests.sh

This requires the environment variable SPARK_HOME to point to your local copy of Spark.

Example

Here is a simple example that runs a grid search with Spark. See the Installation section on how to install the package.

from sklearn import svm, datasets
from spark_sklearn import GridSearchCV
iris = datasets.load_iris()
parameters = {'kernel':('linear', 'rbf'), 'C':[1, 10]}
svr = svm.SVC(gamma='auto')
clf = GridSearchCV(sc, svr, parameters)
clf.fit(iris.data, iris.target)

This classifier can be used as a drop-in replacement for any scikit-learn classifier, with the same API.

Documentation

API documentation is currently hosted on Github pages. To build the docs yourself, see the instructions in docs/.

image

spark-sklearn's People

Contributors

ajaysaini725 avatar digital10111 avatar hahnicity avatar jkbradley avatar kautenja avatar kornosk avatar mengxr avatar shaunswanson avatar smurching avatar srowen avatar thunterdb avatar vlad17 avatar weichenxu123 avatar zero323 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

spark-sklearn's Issues

Clarify RandomizedSearchCV documentation for sampling with replacement

At this line, it may be better to explicitly mention which parameters will be sampled with replacement if any one of them is a distribution:

https://github.com/databricks/spark-sklearn/blob/master/python/spark_sklearn/random_search.py#L27

Are all parameters (those given as distributions and those given as lists of values) sampled with replacement in this scenario, or only parameters given by distributions?

If so, this documentation would be better stated as:

sampling with replacement is used for all parameters.

Implement parallelized RandomizedSearchCV

TypeError: fit() argument after ** must be a mapping, not NoneType

I am trying to run the sample code with iris dataset:

from sklearn import svm, datasets
from pyspark import SparkContext
from spark_sklearn import GridSearchCV
iris = datasets.load_iris()
parameters = {'kernel':('linear', 'rbf'), 'C':[1, 10]}
svr = svm.SVC()
sc = SparkContext()
clf = GridSearchCV(sc, svr, parameters)
clf.fit(iris.data, iris.target)

Running this throws a TypeError,

Traceback (most recent call last):
"<stdin>", line 1, in <module>
"spark_sklearn/grid_search.py", line 272, in fit
return self._fit(X, y, groups, ParameterGrid(self.param_grid))
"spark_sklearn/grid_search.py", line 400, in _fit
best_estimator.fit(X, y, **fit_params)
TypeError: fit() argument after ** must be a mapping, not NoneType
Environment Info:
python 3.5.2
spark 2.2.0
spark_sklearn 0.2.0

ImportError: No module named 'converter'

Hello,

I am trying to use the example code from the README for spark-sklearn. I have installed the latest version of Anaconda, and ran pip install spark_sklearn to install the package. The following code:

from sklearn import svm, grid_search, datasets
from spark_sklearn import GridSearchCV
iris = datasets.load_iris()
parameters = {'kernel':('linear', 'rbf'), 'C':[1, 10]}
svr = svm.SVC()
clf = GridSearchCV(sc, svr, parameters)
clf.fit(iris.data, iris.target)

results in the following traceback:

---------------------------------------------------------------------------
ImportError                               Traceback (most recent call last)
<ipython-input-1-d38860ad054d> in <module>()
      1 from sklearn import svm, grid_search, datasets
----> 2 from spark_sklearn import GridSearchCV
      3 iris = datasets.load_iris()
      4 parameters = {'kernel':('linear', 'rbf'), 'C':[1, 10]}
      5 svr = svm.SVC()

/Users/randal_olson/anaconda/lib/python3.5/site-packages/spark_sklearn-0.1.1-py3.5.egg/spark_sklearn/__init__.py in <module>()
      1 from scipy.sparse import csr_matrix
      2 
----> 3 from converter import Converter
      4 from grid_search import GridSearchCV
      5 from udt import CSRVectorUDT

ImportError: No module named 'converter'

I have tried to install the latest version of spark-sklearn from this repository to no avail. Please advise me how to proceed.

2.0.0 TODOs

Various TODOs are left around the code regarding CI scripts and a 2.0.0 spark dependency, which need to be resolved once the release is available.

Error When Calling toPandas()

Hi,

My scripts are throwing errors when running on clusters:

Converting training data to pandas dataframe

Traceback (most recent call last):
File "/tmp/49a99d28-350b-4942-ab63-f7efa0d2f0ec/random-forest-sklearn.py", line 244, in
training_data = sk_converter.toPandas(trainingSet)
File "/usr/local/lib/python3.4/dist-packages/spark_sklearn/converter.py", line 159, in toPandas
return df.select(*cols).toPandas()
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 1442, in toPandas
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 311, in collect
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 142, in _load_from_socket
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 139, in load_stream
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 156, in _read_with_length
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 543, in read_int
File "/usr/lib/python3.4/socket.py", line 371, in readinto
return self._sock.recv_into(b)
socket.timeout: timed out
16/12/15 22:49:04 ERROR org.apache.spark.api.python.PythonRDD: Error while sending iterator
java.net.SocketException: Broken pipe (Write failed)
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:492)
at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:504)
at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:504)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:112)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:504)
at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:700)
at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply(PythonRDD.scala:700)
at org.apache.spark.api.python.PythonRDD$$anon$2$$anonfun$run$1.apply(PythonRDD.scala:700)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1310)
at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:701)
16/12/15 22:49:04 INFO org.spark_project.jetty.server.ServerConnector: Stopped ServerConnector@54284c76{HTTP/1.1}{0.0.0.0:4040}

However, the scripts works totally fine when running on small dataset locally. My memory should be fine. I got roughly 200G memory each machine (both executors and driver) for my cluster. My data is only 16G. Anyone know the possible issue here ? I appreciate your help in advance.

@thunterdb

toSpark() must be called with Converter instance as first argument

I want to convert a sklearn logistic regression model to Spark but it is not working for me:

from spark_sklearn import Converter logreg_spark_ml = Converter.toSpark(logreg_cv)

TypeError: unbound method toSpark() must be called with Converter instance as first argument (got GridSearchCV instance instead)

Spark 2.3 compatible?

Seems to be such a great idea / use case to run sklearn distributely through Spark!
Haven't seen databricks/spark-sklearn updates for almost a year.. is this an active project?
Is this known to work with Spark 2.3?
Thank you.

Need for an example

More than an issue, I need an example regarding this statement in the main README:

convert Spark's Dataframes seamlessly into numpy ndarray or sparse matrices

How can I do this with spark-sklearn? Can you provide an example?

Many thanks in advance.

gridsearch with StratifiedShuffleSplit error

Hi, I am getting this error when running the code below
'StratifiedShuffleSplit' object is not iterable

from sklearn import svm, grid_search, datasets
from sklearn.model_selection import StratifiedShuffleSplit
from spark_sklearn import GridSearchCV
iris = datasets.load_iris()

parameters = {'kernel':('linear', 'rbf'), 'C':[1, 10]}

svr = svm.SVC()
sss = StratifiedShuffleSplit(n_splits=10, test_size=0.5)
clf = GridSearchCV(sc, svr, parameters, cv=sss)
clf.fit(iris.data, iris.target)

Native linalg libs not getting picked up

When I run unit tests, the following is printed to stderr as they run:

test_LinearRegression_spark2skl (spark_sklearn.converter_test.ConverterTests) ... 16/06/27 16:49:19 WARN WeightedLeastSquares: regParam is zero, which might cause numerical instability and overfitting.
16/06/27 16:49:19 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
16/06/27 16:49:19 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
16/06/27 16:49:19 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
16/06/27 16:49:19 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK

And yet I have the appropriate packages:

vlad@vlad-databricks:~$ dpkg -s libgfortran3 libgfortran-4.9-dev libgfortran-5-dev liblapack-dev libblas-dev >/dev/null ; echo $?
0

Long Time to Collect Results of Distributed Spark-Sklearn Training

I'm running 15 combinations of a Logistic Regression model with spark-sklearn and I'll see that all tasks have completed but there is a huge amount of time to collect all of the results. I'm guessing it's the number of my coefficients that I'm bringing back to the driver. But I've noticed it several times when I'm working with wide datasets or deep random forests. Is it just expected due to network traffic?

Data set size: 31,358 rows, 10000 columns

param_grid = [
  dict(
  penalty=['l2'], 
  C = [1.0, 0.5, 0.1], 
  solver = ['newton-cg', 'lbfgs', 'sag']
  ),
  dict(
  penalty=['l1', 'elasticnet'], 
  C = [1.0, 0.5, 0.1], 
  solver = ['saga',]
  )
]
grid = GridSearchCV(sc, estimator=LogisticRegression(max_iter=500), param_grid=param_grid, n_jobs=-1, cv=5)
grid_result = grid.fit(X_train, y_train)

Environment:

  • Azure Databricks ML 5.5 Runtime
  • 9 Worker nodes with 56GB and 8 cores each

Use generate sklearn UDT within gapply() [SPARK-16062 blocks this]

Currently, KeyedModel fitting in KeyedEstimator._fit is implemented by generating an array of a single serialized estimator, requiring an additional pass over the resulting dataframe which deserializes the UDT.

This is necessary because of a pyspark bug, and the circuitous implementation should be straightened out once the UDT issues are resolved (SPARK-16062).

n_jobs for refitting

For the refit step, it would be convenient to be able to specify n_jobs. After running a grid search grid search , it would be nice to use more of the cores on the master for the final refit step. The n_jobs parameter exists for compatibility, but it is currently no-op. I think it makes sense to have that parameter pass through to the final model refit step.

spark-sklearn on windows- not working on local

am trying to execute this code from the spark-sklearn API documentation. I'm running on Windows 7 and on the latest spark-sklearn version. I'm executing inside the pyspark shell. Running spark version 2.3.2 (tried 2.4.0, same error)

from sklearn.linear_model import LinearRegression
from sklearn.cluster import KMeans
from pyspark.ml.linalg import Vectors, Matrices, MatrixUDT
from pyspark.sql.functions import udf
from pyspark.sql import SparkSession
from spark_sklearn.util import createLocalSparkSession
from spark_sklearn.keyed_models import KeyedEstimator
spark = createLocalSparkSession()
df = spark.createDataFrame([(user,
                             Vectors.dense([i, i ** 2, i ** 3]),
                             0.0 + user + i + 2 * i ** 2 + 3 * i ** 3)
                            for user in range(3) for i in range(5)])
df = df.toDF("key", "features", "y")
df.where("5 < y and y < 10").sort("key", "y").show()

km = KeyedEstimator(sklearnEstimator=LinearRegression(), yCol="y").fit(df)
def printFloat(x):
    rounded = round(x, 2)
    return "{:.2f}".format(0 if rounded == 0 else rounded)

def printModel(model):
    coef = "[" + ", ".join(map(printFloat, model.coef_)) + "]"
    intercept = printFloat(model.intercept_)
    return "intercept: {} coef: {}".format(intercept, coef)

km.keyedModels.columns

printedModels = udf(printModel)("estimator").alias("linear fit")
km.keyedModels.select("key", printedModels).sort("key").show(truncate=False)`

On running this I get the following error on the show at km.keyedModels.select("key", printedModels).sort("key").show(truncate=False)

image
I cannot seem to find any solution online. Since the show before this is working, clearly my spark seems to be working.

Namespace issue with pyspark.ml and pyspark.mllib

I tried to run the default example on the README page

from sklearn import svm, grid_search, datasets
from spark_sklearn import GridSearchCV
iris = datasets.load_iris()
parameters = {'kernel':('linear', 'rbf'), 'C':[1, 10]}
svr = svm.SVC()
clf = GridSearchCV(sc, svr, parameters)
clf.fit(iris.data, iris.target)

on Spark, but got the following error:

ImportError: No module named linalg

The code that causes this is the import of pyspark.ml.linalg on this line in converter.py in spark_sklearn

We are running Spark 1.6, and according to the documentation, in 1.6 and above, linalg is under pyspark.mllib.linalg instead of pyspark.ml.linalg.

I'm trying to figure out if it's an issue with my versions or what else exactly, given that the README mentions Spark 2.0 compatibility, but if this indeed an issue with spark_sklearn, it looks like this should be broken then since at least 1.6.0? Can someone confirm?

test_scipy_sparse (spark_sklearn.converter_test.CSRVectorUDTTests) failure

Getting this test failure:

 (spark_sklearn.converter_test.CSRVectorUDTTests)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/stoker/spark-sklearn/python/spark_sklearn/converter_test.py", line 83, in test_scipy_sparse
    self.assertEqual(df.count(), 1)
  File "/usr/local/spark/python/pyspark/sql/dataframe.py", line 522, in count
    return int(self._jdf.count())
  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/local/spark/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling o652.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 1 times, most recent failure: Lost task 1.0 in stage 0.0 (TID 1, localhost, executor driver): net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.dtype)
        at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
        at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707)
        at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175)
        at net.razorvine.pickle.Unpickler.load(Unpickler.java:99)
        at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112)
        at org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:188)
        at org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:187)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
        at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:299)
        at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2830)
        at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2829)
        at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
        at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
        at org.apache.spark.sql.Dataset.count(Dataset.scala:2829)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
Caused by: net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.dtype)
        at net.razorvine.pickle.objects.ClassDictConstructor.construct(ClassDictConstructor.java:23)
        at net.razorvine.pickle.Unpickler.load_reduce(Unpickler.java:707)
        at net.razorvine.pickle.Unpickler.dispatch(Unpickler.java:175)
        at net.razorvine.pickle.Unpickler.load(Unpickler.java:99)
        at net.razorvine.pickle.Unpickler.loads(Unpickler.java:112)
        at org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:188)
        at org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToJava$1$$anonfun$apply$1.apply(SerDeUtil.scala:187)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:435)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:441)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more


----------------------------------------------------------------------
Ran 87 tests in 398.742s

FAILED (SKIP=1, errors=1)

My versions:
python = 2.7.15 Anaconda
scipy = 1.1.0
numpy = 1.11.1
scikit-learn = 0.19.2
pyspark = 2.4.3

I looked quite a bit at the actual issue from python's side - sending of any other kind of matrix/dataframe I could think of went off fine, just seemed to be the sparse matrices. Something seemed weird about how the test was having to wrap the sparse matrix itself a bit in lists and tuples to get createDataFrame to take it, but I wasn't able to figure out how to simplify that.

My instinct says something in how that data frame is set up isn't setting something in an initializer somewhere, creating the java issue.

Multiple scorers

Newest SKLearn docs(http://scikit-learn.org/stable/modules/generated/sklearn.model_selection.GridSearchCV.html) say that:

scoring: For evaluating multiple metrics, either give a list of (unique) strings or a dict with names as keys and callables as values.

However, spark-sklearn does not seem to accept scoring parameter in a form of list.

Is there an easy way to accomplish that(submitting few scorers, eg. scoring = ['accuracy', 'f1', 'roc_auc', 'average_precision']? Or is it perhaps in your roadmap? If not, how much time(approximately) would the implementation take for somebody new to the project(like me)?

Crashing for larger data set

I am running spark context with specification -

from pyspark import SparkConf, SparkContext
conf = (SparkConf()
         .setMaster("spark-master-url")
         .setAppName("PySparkShell")
         .set("spark.executor.memory", "6800M"))
sc = SparkContext(conf = conf)

The program is working fine when X_train length is 5000 but fails when the size is increased to 12000.

spark keeps crashing with following errors -

 Lost task 13.0 in stage 1.0 (TID 109, 172.31.8.203, executor 1): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:230)
        at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
        at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
        at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:99)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException
        at java.io.DataInputStream.readInt(DataInputStream.java:392)
        at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
        ... 11 more

More details here

Lint script

Repo requires a lint script for style checks, which should be active in the travis CI script.

Spark Broadcast exceeding executor memory with large training data set

My X variable is between 1-4 GB. The pre_dispatch arg when initializing GridSearchCV doesn't appear to be used or have any effect, preventing me from parallelizing a decently-sized param_grid in my Spark cluster.

The X variable is being broadcast to all combinations of param_grid (48 in our case), causing memory to fill up.

X_bc = self.sc.broadcast(X)

Is there a way that the code above could pull the broadcast variables, rather than pushing them eagerly to every task?

error message: sbt.ResolveException: unresolved dependency: org.apache.spark#spark-mllib_2.10;1.6.0-SNAPSHOT: not found

I clone the repo, and install the following packages,

  1. pip install spark-sklearn
  2. pip install nose

also I go to the folder spark-sklearn/python run,

  1. python setup.py build
  2. python setup.py install
  3. ./run run-tests.sh

Everything looks fine.

However, when I go to spark-sklearn/ folder and run sbt package, it gave me an error, Please help me what's wrong on my environment.

Thanks,
Jianhong

[info] Loading project definition from /jianhong/GitHub/spark-sklearn/spark-sklearn/project
[info] Set current project to spark-sklearn (in build file:/jianhong/GitHub/spark-sklearn/spark-sklearn/)
[info] Updating {file:/jianhong/GitHub/spark-sklearn/spark-sklearn/}spark-sklearn...
[info] Resolving org.apache.spark#spark-mllib_2.10;1.6.0-SNAPSHOT ...
[warn] module not found: org.apache.spark#spark-mllib_2.10;1.6.0-SNAPSHOT
[warn] ==== local: tried
[warn] /home/jxia/.ivy2/local/org.apache.spark/spark-mllib_2.10/1.6.0-SNAPSHOT/ivys/ivy.xml
[warn] ==== public: tried
[warn] https://repo1.maven.org/maven2/org/apache/spark/spark-mllib_2.10/1.6.0-SNAPSHOT/spark-mllib_2.10-1.6.0-SNAPSHOT.pom
[warn] ==== Spark Packages Repo: tried
[warn] https://dl.bintray.com/spark-packages/maven/org/apache/spark/spark-mllib_2.10/1.6.0-SNAPSHOT/spark-mllib_2.10-1.6.0-SNAPSHOT.pom
[info] Resolving org.fusesource.jansi#jansi;1.4 ...
[warn] ::::::::::::::::::::::::::::::::::::::::::::::
[warn] :: UNRESOLVED DEPENDENCIES ::
[warn] ::::::::::::::::::::::::::::::::::::::::::::::
[warn] :: org.apache.spark#spark-mllib_2.10;1.6.0-SNAPSHOT: not found
[warn] ::::::::::::::::::::::::::::::::::::::::::::::
[warn]
[warn] Note: Unresolved dependencies path:
[warn] org.apache.spark:spark-mllib_2.10:1.6.0-SNAPSHOT ((sbtsparkpackage.SparkPackagePlugin) SparkPackagePlugin.scala#L241)
[warn] +- default:spark-sklearn_2.10:0.0.1-SNAPSHOT
sbt.ResolveException: unresolved dependency: org.apache.spark#spark-mllib_2.10;1.6.0-SNAPSHOT: not found
at sbt.IvyActions$.sbt$IvyActions$$resolve(IvyActions.scala:243)
at sbt.IvyActions$$anonfun$updateEither$1.apply(IvyActions.scala:158)
at sbt.IvyActions$$anonfun$updateEither$1.apply(IvyActions.scala:156)
at sbt.IvySbt$Module$$anonfun$withModule$1.apply(Ivy.scala:147)
at sbt.IvySbt$Module$$anonfun$withModule$1.apply(Ivy.scala:147)
at sbt.IvySbt$$anonfun$withIvy$1.apply(Ivy.scala:124)
at sbt.IvySbt.sbt$IvySbt$$action$1(Ivy.scala:56)
at sbt.IvySbt$$anon$3.call(Ivy.scala:64)
at xsbt.boot.Locks$GlobalLock.withChannel$1(Locks.scala:93)
at xsbt.boot.Locks$GlobalLock.xsbt$boot$Locks$GlobalLock$$withChannelRetries$1(Locks.scala:78)
at xsbt.boot.Locks$GlobalLock$$anonfun$withFileLock$1.apply(Locks.scala:97)
at xsbt.boot.Using$.withResource(Using.scala:10)
at xsbt.boot.Using$.apply(Using.scala:9)
at xsbt.boot.Locks$GlobalLock.ignoringDeadlockAvoided(Locks.scala:58)
at xsbt.boot.Locks$GlobalLock.withLock(Locks.scala:48)
at xsbt.boot.Locks$.apply0(Locks.scala:31)
at xsbt.boot.Locks$.apply(Locks.scala:28)
at sbt.IvySbt.withDefaultLogger(Ivy.scala:64)
at sbt.IvySbt.withIvy(Ivy.scala:119)
at sbt.IvySbt.withIvy(Ivy.scala:116)
at sbt.IvySbt$Module.withModule(Ivy.scala:147)
at sbt.IvyActions$.updateEither(IvyActions.scala:156)
at sbt.Classpaths$$anonfun$sbt$Classpaths$$work$1$1.apply(Defaults.scala:1282)
at sbt.Classpaths$$anonfun$sbt$Classpaths$$work$1$1.apply(Defaults.scala:1279)
at sbt.Classpaths$$anonfun$doWork$1$1$$anonfun$84.apply(Defaults.scala:1309)
at sbt.Classpaths$$anonfun$doWork$1$1$$anonfun$84.apply(Defaults.scala:1307)
at sbt.Tracked$$anonfun$lastOutput$1.apply(Tracked.scala:35)
at sbt.Classpaths$$anonfun$doWork$1$1.apply(Defaults.scala:1312)
at sbt.Classpaths$$anonfun$doWork$1$1.apply(Defaults.scala:1306)
at sbt.Tracked$$anonfun$inputChanged$1.apply(Tracked.scala:45)
at sbt.Classpaths$.cachedUpdate(Defaults.scala:1324)
at sbt.Classpaths$$anonfun$updateTask$1.apply(Defaults.scala:1264)
at sbt.Classpaths$$anonfun$updateTask$1.apply(Defaults.scala:1242)
at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47)
at sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:40)
at sbt.std.Transform$$anon$4.work(System.scala:63)
at sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:226)
at sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:226)
at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:17)
at sbt.Execute.work(Execute.scala:235)
at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:226)
at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:226)
at sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:159)
at sbt.CompletionService$$anon$2.call(CompletionService.scala:28)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
error sbt.ResolveException: unresolved dependency: org.apache.spark#spark-mllib_2.10;1.6.0-SNAPSHOT: not found
[error] Total time: 1 s, completed Apr 13, 2016 4:11:28 PM

best_params_ missing on GridSearchCV

The best_params_ dict seems to be missing from GridSearchCV, even if refitting is enabled.
grid_search.py#L195 refers to that parameter, it is determined in grid_search.py#L371 but never actually exposed after fitting. This contradicts both, your own docs and the Scikit Learn 0.19.1 and 0.20.0 docs. Was this attribute purposefully not exposed or is this a bug? Depending on your answer, I'd be happy to provide a PR for a change of the docs or the code to resolve this.

See also #37 which uncovered this a year ago.

AttributeError: 'function' object has no attribute '_input_kwargs'

I am using python 2.7, spark-2.2.0 with hadoop2.7 and sklearn 0.19.
I get the following error:
Traceback (most recent call last):
File "test.py", line 19, in
km = KeyedEstimator(sklearnEstimator=LinearRegression(), yCol="y").fit(df)
File "C:\spark-2.2.0-bin-hadoop2.7\python\pyspark\__init__.py", line 104, in wrapper
return func(self, **kwargs)
File "C:\Python27\lib\site-packages\spark_sklearn\keyed_models.py", line 323, in __init__
kwargs = KeyedEstimator._inferredParams(sklearnEstimator, self.__init__._input_kwargs)
AttributeError: 'function' object has no attribute '_input_kwargs'

when I try to run the code:
km = KeyedEstimator(sklearnEstimator=LinearRegression(), yCol="y").fit(df)
From the origian example code available in the welcome page.
I also tried the Kmeans clustering, but it caused the same error.

I downloaded the source code and checked line 323 in keyed_models.py, which was:
kwargs = KeyedEstimator._inferredParams(sklearnEstimator, self._input_kwargs)
Please correct me if I'm wrong but the two, do not seem to match

Non-deterministic results because of aggregation steps

Because the aggregation step is not deterministic, the data may be presented in different orders between runs of the same query. A number of algorithms (DBSCAN) will then give different results because.

The recommended solution is to sort all the data point by lexicographic order before fitting them:

sorted_data = data[numpy.lexsort(data.T)]

Update to latest scikit-learn release for deprecation and compatibility

Using the current head 0.2.0 release of spark-sklearn and the current release of scikit-learn (0.18.1), I'm getting the following deprecation warning:

/.../python3.4/site-packages/sklearn/cross_validation.py:44: DeprecationWarning: This module was deprecated in version 0.18 in favor of the model_selection module into which all the refactored classes and functions are moved. Also note that the interface of the new CV iterators are different from that of this module. This module will be removed in 0.20.
"This module will be removed in 0.20.", DeprecationWarning)

the library needs to be updated to use the new model_selection module and iterator interfaces.

In addition, due to changes in sklearn.model_selection.GridSearchCV, the attributes available on the fitted spark-sklearn.GridSearchCV are out of date.

sklearn.model_selection.GridSearchCV now has:

  • cv_results_ : dict of numpy (masked) ndarrays - A dict with keys as column headers and values as columns, that can be imported into a pandas DataFrame.
  • best_estimator_ : estimator - Estimator that was chosen by the search, i.e. estimator which gave highest score (or smallest loss if specified) on the left out data. Not available if refit=False.
  • best_score_ : float - Score of best_estimator on the left out data.
  • best_params_ : dict - Parameter setting that gave the best results on the hold out data.
  • best_index_ : int - The index (of the cv_results_ arrays) which corresponds to the best candidate parameter setting.
  • scorer_ : function - Scorer function used on the held out data to choose the best parameters for the model.
  • n_splits_ : int - The number of cross-validation splits (folds/iterations).

While spark-sklearn.GridSearchCV has:

  • grid_scores_ : list of named tuples
  • best_estimator_ : estimator - Estimator that was chosen by the search, i.e. estimator which gave highest score (or smallest loss if specified) on the left out data. Not available if refit=False.
  • best_score_ : float - Score of best_estimator on the left out data.
  • best_params_ : dict - Parameter setting that gave the best results on the hold out data.
  • scorer_ : function - Scorer function used on the held out data to choose the best parameters for the model.

The most critical difference is that sklearn added the more comprehensive cv_results_ which adds data that the formerly compatible grid_scores_ is lacking.

Convert IndexError: tuple index out of range

I am using Python 3.4.5, and spark-2.1.0-bin-hadoop2.7, and sklearn 0.18.2. When I use the Convert.toSKLearn() and then try to make predictions, I get a IndexError: tuple index out of range error.

Full example below:

from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
from pyspark.sql import Row
from pyspark.ml.classification import LogisticRegression
from spark_sklearn import Converter
import numpy as np

spark = SparkSession.builder.getOrCreate()

# Create fake data
features = [
    [1, 0, 0],
    [1, 0, 0],
    [0, 0, 1]
]
labels = [1, 1, 0]

data = []
for i in range(len(features)):
    data.append(
        Row(label=labels[i], features=Vectors.dense(features[i]))
    )

df = spark.sparkContext.parallelize(data).toDF()
df.show()

# Train a model
model = LogisticRegression()
model = model.fit(df)

# Convert to sklearn
converter = Converter(spark.sparkContext)
sk_model = converter.toSKLearn(model)

# Make some predictions
predictions = sk_model.predict_proba(np.vstack(features))
print(predictions)

Which raises this:

Traceback (most recent call last):
[...]
  File "<path>/.venv/lib/python3.4/site-packages/sklearn/linear_model/logistic.py", line 1286, in predict_proba
    return super(LogisticRegression, self)._predict_proba_lr(X)
  File "<path>/.venv/lib/python3.4/site-packages/sklearn/linear_model/base.py", line 350, in _predict_proba_lr
    prob = self.decision_function(X)
  File "<path>/.venv/lib/python3.4/site-packages/sklearn/linear_model/base.py", line 314, in decision_function
    n_features = self.coef_.shape[1]
IndexError: tuple index out of range

It seems that this can be solved by reshaping the coefficients:

# Convert to sklearn
converter = Converter(spark.sparkContext)
sk_model = converter.toSKLearn(model)

sk_model.coef_ = sk_model.coef_.reshape(1, -1)  # This is needed!

predictions = sk_model.predict_proba(np.vstack(features))
print(predictions)

Is this the right thing to do, or have I missed something?

CSR Matrix Support for Spark 2.0

There's no support for CSR matrices in Spark 2.0.

To replicate, run python/run-tests.sh --nologcapture spark_sklearn.converter_test

pip install spark-sklearn-(version-no) doesn't work

Hi
I am looking to use best_score_ parameter from GridSearchCV function, but it looks like that is not present in the latest version of the library. When I'm trying to uninstall the latest version and reinstall and older version with the command
pip install spark-sklearn-(version-no)
It does not work. Please rectify this or refer me to a some documentation to see how I can install older versions in my cluster environments.
Thanks

ImportError: Module not found with Azure Spark Cluster

I'm trying to run spark-sklearns GridSearch on an HDInsight Cluster from Azure. Here is a Code Snippet:

        model = KerasRegressor(build_fn=build_model, verbose=0)

        kf = KFold(n_splits=self.cv_split, shuffle=True)  # Cross validation with k=5

        sc = SparkContext.getOrCreate()
        grid = GridSearchCV(sc=sc, estimator=model, param_grid=self.params, 
                            cv=kf, return_train_score=True, verbose=2,
                            fit_params={'epochs': nb_epoch, 'batch_size': 32})

        hist = grid.fit(x_train, y_train)

It works fine until I call the grid.fit method, which returns the following exception:

  File "/mnt/resource/hadoop/yarn/local/usercache/nsusshuser/appcache/application_1526397916826_0020/container_e01_1526397916826_0020_01_000001/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 0.0 failed 4 times, most recent failure: Lost task 2.3 in stage 0.0 (TID 13, wn0-bt-nsu.kkatsjzvwzuephdjshji40kxae.ax.internal.cloudapp.net, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/mnt/resource/hadoop/yarn/local/usercache/nsusshuser/appcache/application_1526397916826_0020/container_e01_1526397916826_0020_01_000002/pyspark.zip/pyspark/worker.py", line 166, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/mnt/resource/hadoop/yarn/local/usercache/nsusshuser/appcache/application_1526397916826_0020/container_e01_1526397916826_0020_01_000002/pyspark.zip/pyspark/worker.py", line 55, in read_command
    command = serializer._read_with_length(file)
  File "/mnt/resource/hadoop/yarn/local/usercache/nsusshuser/appcache/application_1526397916826_0020/container_e01_1526397916826_0020_01_000002/pyspark.zip/pyspark/serializers.py", line 169, in _read_with_length
    return self.loads(obj)
  File "/mnt/resource/hadoop/yarn/local/usercache/nsusshuser/appcache/application_1526397916826_0020/container_e01_1526397916826_0020_01_000002/pyspark.zip/pyspark/serializers.py", line 455, in loads
    return pickle.loads(obj, encoding=encoding)
ImportError: No module named 'ml'

...

Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/mnt/resource/hadoop/yarn/local/usercache/nsusshuser/appcache/application_1526397916826_0020/container_e01_1526397916826_0020_01_000002/pyspark.zip/pyspark/worker.py", line 166, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/mnt/resource/hadoop/yarn/local/usercache/nsusshuser/appcache/application_1526397916826_0020/container_e01_1526397916826_0020_01_000002/pyspark.zip/pyspark/worker.py", line 55, in read_command
    command = serializer._read_with_length(file)
  File "/mnt/resource/hadoop/yarn/local/usercache/nsusshuser/appcache/application_1526397916826_0020/container_e01_1526397916826_0020_01_000002/pyspark.zip/pyspark/serializers.py", line 169, in _read_with_length
    return self.loads(obj)
  File "/mnt/resource/hadoop/yarn/local/usercache/nsusshuser/appcache/application_1526397916826_0020/container_e01_1526397916826_0020_01_000002/pyspark.zip/pyspark/serializers.py", line 455, in loads
    return pickle.loads(obj, encoding=encoding)
ImportError: No module named 'ml'

The ml module is part of our project. I checked sys.modules and it is in there. Don't really understand the error message. Can somebody help me out?

Is there any way to broadcast sk-learn model?

I hava try some code like this:

from spark_sklearn import GridSearchCV
import cPickle as pickle

session = SparkSession.builder.master("local[2]").appName("test").getOrCreate()

# iris = datasets.load_iris()
# print(iris.target)

documentDF = session.createDataFrame([
    ("Hi I heard about Spark", "spark"),
    ("I wish Java could use case classes", "java"),
    ("Logistic regression models are neat", "mlib"),
    ("Logistic regression models are neat", "spark"),
    ("Logistic regression models are neat", "mlib"),
    ("Logistic regression models are neat", "java"),
    ("Logistic regression models are neat", "spark"),
    ("Logistic regression models are neat", "java"),
    ("Logistic regression models are neat", "mlib")
], ["text", "preds"]).select(f.split("text", "\\s+").alias("new_text"), "preds")

word2vec = Word2Vec(vectorSize=100, minCount=1, inputCol="new_text",
                    outputCol="features")
indexer = StringIndexer(inputCol="preds", outputCol="labels")

pipline = Pipeline(stages=[word2vec, indexer])
ds = pipline.fit(documentDF).transform(documentDF)

data = ds.toPandas()
parameters = {'kernel': ('linear', 'rbf')}
svr = svm.SVC()
clf = GridSearchCV(session.sparkContext, svr, parameters)
X = [x.values for x in data.features.values]
y = [int(x) for x in data.labels.values]
model = clf.fit(X, y)
modelB = session.sparkContext.broadcast(pickle.dumps(model))
wow = documentDF.rdd.map(lambda row: pickle.loads(modelB.value).transform(row["features"].values)).collect()
print(wow)

but the code will fail because sk-learn model can not been pickle. Is there any way to broadcast model?

best_params_ not supported by RandomizedSearchCV()

The documentation for RandomizedSearchCV implies that a best_params_ property is available after .fit() is called. This does not appear to be the case.

Here is the documentation in question:

https://github.com/databricks/spark-sklearn/blob/master/python/spark_sklearn/random_search.py#L162

Should I attempt to implement a best_params_ property or indicate in the documentation that best_params_ is not supported (as it is a common API in sklearn-related libraries) in a PR?

Question about further support

Hi,

I am looking for road map information on this project. In specific I am looking for, if there is any work going on/plans for support of using the model generated from scikit in Spark pipelineModel and also any plans on support of other ML Algorithms like Decision Tress, K-means etc.

Could anyone shed some light on this.

Thanks
Praveen

Training large number of models

Hi,

This looks fantastic and looks to almost solve one of my problems.

Essentially I have a model to predict the amount of sales of a particular product for each day given various features (historical sales volumes, type of store, day of week, time of year, etc.). The training set fits easily in memory,

I want to train this model for 1000's of different products (we need a model per product)

The features and model used are the same. The exact values of the features will change per product.

In pseudo code this looks like:

for (product in products) { # We can parallelise this loop on a single machine, but could we parallelise over spark?

   # Extract train dataset from hive over ODBC
   # Train model
   # Output predictions
}

Any thoughts if spark-sklearn could be used to support this?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.