Giter Club home page Giter Club logo

streamdm's People

Contributors

abifet avatar benbenqiang avatar dependabot[bot] avatar hmgomes avatar ioanna-ki avatar jianfengqian avatar jochenschneider avatar kanata2 avatar mageirakos avatar nhnminh avatar pssf23 avatar roywjt avatar smaniu avatar tianguangjian avatar zhangjiajin avatar zhensongqian avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

streamdm's Issues

Build failure (unresolved dependency)

Bug Report template

Expected behavior

Success build with command sbt package

Observed behavior

<< What results have you obtained? Is there an error message? If so, please paste it in here (not all issues produce error messages) >>
While following up the tutorial, build stoped with failure.

Steps to reproduce the issue

Build fail with log with command sbt package

[error] sbt.librarymanagement.ResolveException: unresolved dependency: com.typesafe.sbteclipse#sbteclipse-plugin;4.0.0: not found
[error] unresolved dependency: com.github.mpeltonen#sbt-idea;1.6.0: not found
[error] unresolved dependency: org.netbeans.nbsbt#nbsbt-plugin;1.1.4: not found
[error] unresolved dependency: com.orrsella#sbt-sublime;1.1.1: not found
[error] 	at sbt.internal.librarymanagement.IvyActions$.resolveAndRetrieve(IvyActions.scala:332)
[error] 	at sbt.internal.librarymanagement.IvyActions$.$anonfun$updateEither$1(IvyActions.scala:208)
[error] 	at sbt.internal.librarymanagement.IvySbt$Module.$anonfun$withModule$1(Ivy.scala:239)
[error] 	at sbt.internal.librarymanagement.IvySbt.$anonfun$withIvy$1(Ivy.scala:204)
[error] 	at sbt.internal.librarymanagement.IvySbt.sbt$internal$librarymanagement$IvySbt$$action$1(Ivy.scala:70)
[error] 	at sbt.internal.librarymanagement.IvySbt$$anon$3.call(Ivy.scala:77)
[error] 	at xsbt.boot.Locks$GlobalLock.withChannel$1(Locks.scala:95)
[error] 	at xsbt.boot.Locks$GlobalLock.xsbt$boot$Locks$GlobalLock$$withChannelRetries$1(Locks.scala:80)
[error] 	at xsbt.boot.Locks$GlobalLock$$anonfun$withFileLock$1.apply(Locks.scala:99)
[error] 	at xsbt.boot.Using$.withResource(Using.scala:10)
[error] 	at xsbt.boot.Using$.apply(Using.scala:9)
[error] 	at xsbt.boot.Locks$GlobalLock.ignoringDeadlockAvoided(Locks.scala:60)
[error] 	at xsbt.boot.Locks$GlobalLock.withLock(Locks.scala:50)
[error] 	at xsbt.boot.Locks$.apply0(Locks.scala:31)
[error] 	at xsbt.boot.Locks$.apply(Locks.scala:28)
[error] 	at sbt.internal.librarymanagement.IvySbt.withDefaultLogger(Ivy.scala:77)
[error] 	at sbt.internal.librarymanagement.IvySbt.withIvy(Ivy.scala:199)
[error] 	at sbt.internal.librarymanagement.IvySbt.withIvy(Ivy.scala:196)
[error] 	at sbt.internal.librarymanagement.IvySbt$Module.withModule(Ivy.scala:238)
[error] 	at sbt.internal.librarymanagement.IvyActions$.updateEither(IvyActions.scala:193)
[error] 	at sbt.librarymanagement.ivy.IvyDependencyResolution.update(IvyDependencyResolution.scala:20)
[error] 	at sbt.librarymanagement.DependencyResolution.update(DependencyResolution.scala:56)
[error] 	at sbt.internal.LibraryManagement$.resolve$1(LibraryManagement.scala:45)
[error] 	at sbt.internal.LibraryManagement$.$anonfun$cachedUpdate$12(LibraryManagement.scala:93)
[error] 	at sbt.util.Tracked$.$anonfun$lastOutput$1(Tracked.scala:68)
[error] 	at sbt.internal.LibraryManagement$.$anonfun$cachedUpdate$19(LibraryManagement.scala:106)
[error] 	at scala.util.control.Exception$Catch.apply(Exception.scala:224)
[error] 	at sbt.internal.LibraryManagement$.$anonfun$cachedUpdate$11(LibraryManagement.scala:106)
[error] 	at sbt.internal.LibraryManagement$.$anonfun$cachedUpdate$11$adapted(LibraryManagement.scala:89)
[error] 	at sbt.util.Tracked$.$anonfun$inputChanged$1(Tracked.scala:149)
[error] 	at sbt.internal.LibraryManagement$.cachedUpdate(LibraryManagement.scala:120)
[error] 	at sbt.Classpaths$.$anonfun$updateTask$5(Defaults.scala:2519)
[error] 	at scala.Function1.$anonfun$compose$1(Function1.scala:44)
[error] 	at sbt.internal.util.$tilde$greater.$anonfun$$u2219$1(TypeFunctions.scala:40)
[error] 	at sbt.std.Transform$$anon$4.work(System.scala:67)
[error] 	at sbt.Execute.$anonfun$submit$2(Execute.scala:269)
[error] 	at sbt.internal.util.ErrorHandling$.wideConvert(ErrorHandling.scala:16)
[error] 	at sbt.Execute.work(Execute.scala:278)
[error] 	at sbt.Execute.$anonfun$submit$1(Execute.scala:269)
[error] 	at sbt.ConcurrentRestrictions$$anon$4.$anonfun$submitValid$1(ConcurrentRestrictions.scala:178)
[error] 	at sbt.CompletionService$$anon$2.call(CompletionService.scala:37)
[error] 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[error] 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[error] 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[error] 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
[error] 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[error] 	at java.lang.Thread.run(Thread.java:745)
[error] (update) sbt.librarymanagement.ResolveException: unresolved dependency: com.typesafe.sbteclipse#sbteclipse-plugin;4.0.0: not found
[error] unresolved dependency: com.github.mpeltonen#sbt-idea;1.6.0: not found
[error] unresolved dependency: org.netbeans.nbsbt#nbsbt-plugin;1.1.4: not found
[error] unresolved dependency: com.orrsella#sbt-sublime;1.1.1: not found
Project loading failed: (r)etry, (q)uit, (l)ast, or (i)gnore? i

Infrastructure details

  • Java Version: : Java 8
  • Scala Version: : Scala 2.12.6

Create global clustering assignment method

The following code appears in several methods in the implementation of Clustream and StreamKM:

val assignedCl = getClusters.foldLeft((0,Double.MaxValue,0))(
  (cl,centr) => {
    val dist = centr.in.distanceTo(x.in)
    if(dist<cl._2) ((cl._3,dist,cl._3+1))
     else ((cl._1,cl._2,cl._3+1))
   })._1

I suggest creating a ClusteringUtils object in which we put general methods like this, to avoid repetition and possible bugs due to changes.

Record format with Kafka

Infrastructure details
Java Version:1.8
Scala Version:2.11
Spark version:2.2
OS version:win7+centos6.7
Cluster mode

I am using kafka with StreamDM
suppose there is a data set for traning with format(e.g. csv) as following:

1,14.23,1.71,2.43,15.6,127,2.8,3.06,.28,2.29,5.64,1.04,3.92,1065
1,13.2,1.78,2.14,11.2,100,2.65,2.76,.26,1.28,4.38,1.05,3.4,1050
1,13.16,2.36,2.67,18.6,101,2.8,3.24,.3,2.81,5.68,1.03,3.17,1185

the first element is label and the others could be assembled to feature

so,what is the format of these records if i use producer to send them to topic?

e.g.
if i have a sample with
label = '1'
feature='14.23,1.71,2.43,15.6,127,2.8,3.06,.28,2.29,5.64,1.04,3.92,1065'

how could i construct ProducerRecord with label and feature?

how to make it works?

I got different errors reported by different sbt version. How can I make it works after download it.
Should I install scala, spark before set up this package 'sbt package'?

streamDM-ONCE

Enhancement

Description

我们提出了ONCE算法,该算法主要处理序列挖掘的问题,当信号依次到达时,该算法可以有效的从动态到达的数据中,计算出带有时间约束的信号序列片段出现的频率,并将该算法应用到Spark Streaming上。我们希望将该算法结合到华为的streamdm上。

Resources

我们的具体方案描述在论文ONCE: Counting the Frequency of Time-constrained Serial Episodes in a Streaming Sequence 。

Command line
Sender.scala文件产生测试数据,
ONCEStreaming.scala 文件使用ONCE算法对数据进行分析。
首先运行Sender.scala产生测试数据拥塞等待socket的链接,启动spark集群,之后运行ONCEStreaming.scala ,socket链接成功后,开始对Sender产生并发送来的信号序列片段进行计数统计。
Data source
_这里测试数据是我们自己随机生成的,使用0到9表示10个不同信号,信号与其到达的时间组成数据对,将每50个数据对做成一个队列封装到RDD中作为测试数据,例如(2,50)表示的是,信号2的到达时间是50,我们将每50个数字对作为一组,装入RDD通过socket发送给接收端,作为测试数据使用。生成数据的代码如下:

`
Infrastructure details

  • Java Version:“1.8.0_152-ea”
  • Scala Version:2.11.8
  • *Spark version:spark-2.1.0-bin-hadoop2.7
  • OS version:macOS 10.13.2
  • local mode

Using Hoeffding Tree as the base learner for Bagging

Expected behavior

It should be possible to use the Hoeffding Tree classifier as the base learner for the Bagging method.
Precisely, I've tried executing Bagging using the Hoeffding Tree as the base learner and it failed.

Observed behavior

No classification results were generated. An exception was raised during execution.

Exception:

17/12/09 23:17:06 ERROR Executor: Exception in task 1.0 in stage 10.0 (TID 21)
java.lang.ClassCastException: org.apache.spark.streamdm.classifiers.trees.HoeffdingTreeModel cannot be cast to org.apache.spark.streamdm.core.ClassificationModel
	at org.apache.spark.streamdm.classifiers.meta.Bagging$$anonfun$ensemblePredict$1.apply$mcVI$sp(Bagging.scala:114)
	at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
	at org.apache.spark.streamdm.classifiers.meta.Bagging.ensemblePredict(Bagging.scala:113)
	at org.apache.spark.streamdm.classifiers.meta.Bagging$$anonfun$predict$1.apply(Bagging.scala:97)
	at org.apache.spark.streamdm.classifiers.meta.Bagging$$anonfun$predict$1.apply(Bagging.scala:97)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:193)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Steps to reproduce the issue

Used 2 different datasets (electricity and covertype).

Command line

./spark.sh "200 EvaluatePrequential -l (meta.Bagging -l trees.HoeffdingTree) -s (FileReader -f ../data/elecNormNew.arff -k 4532 -d 10 -i 45312) -e (BasicClassificationEvaluator -c -m) -h" 1> result_elec.txt 2> log_elec.log
./spark.sh "200 EvaluatePrequential -l (meta.Bagging -l trees.HoeffdingTree)-s (FileReader -f ../data/covtypeNorm.arff -k 5810 -d 10 -i 581012) -e (BasicClassificationEvaluator -c -m) -h" 1> result_covt.txt 2> log_covt.log

Data source
elecNormNew.arff and covtypeNorm.arff

Infrastructure details

  • Java Version: 1.8.0_152
  • Scala Version: 2.10
  • Spark version: 2.2.0
  • OS version: macOS Sierra (10.12.4)
  • Cluster mode or local mode? Both

Clustream k-means processing time

Currently, CluStream does a k-means from the microclusters, to generate the final clusters. In the current implementation, this is done in every batch, in the train loop.

This is normal, if we want to evaluate the clusters which are generated by CluStream. However, since k-means is set by default to iterate 1000 times, this generates a scheduling delay in the processing of batches.

To give an example, for a synthetic dataset of 3 features, dense, a socket stream sending ~600 records/sec, and a window of 10 seconds:

  • each batch is processed between 15 to 20 seconds,
  • this, in turn, generates at least 10 seconds of delay between the batches, which, obviously, adds up.

My solution would be to process k-means "lazily"; that is, to only process it when we need an assignment to evaluate, in assign. This makes sense for two reasons:

  • normally, assignment would only be on demand, and not in every loop,
  • since assignment is a different stream thread, it can be parallelised (i.e., performed at the same time as the train loop).

I suspect the same observation might hold for StreamKM++, although I'm not sure.

Issue with Kafka Reader

Hello,
I have an issue with kafka, I ran a command which was reading the dataset using FileReader, and it was OK,
then I tried to do the same using 'KafkaReader', but it fails. the log results are shown below:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 18/05/26 07:01:30 INFO SparkContext: Running Spark version 2.1.0 18/05/26 07:01:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 18/05/26 07:01:31 WARN Utils: Your hostname, localhost.localdomain resolves to a loopback address: 127.0.0.1; using 192.168.17.177 instead (on interface eno33557248) 18/05/26 07:01:31 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 18/05/26 07:01:31 INFO SecurityManager: Changing view acls to: root 18/05/26 07:01:31 INFO SecurityManager: Changing modify acls to: root 18/05/26 07:01:31 INFO SecurityManager: Changing view acls groups to: 18/05/26 07:01:31 INFO SecurityManager: Changing modify acls groups to: 18/05/26 07:01:31 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set() 18/05/26 07:01:31 INFO Utils: Successfully started service 'sparkDriver' on port 44017. 18/05/26 07:01:31 INFO SparkEnv: Registering MapOutputTracker 18/05/26 07:01:31 INFO SparkEnv: Registering BlockManagerMaster 18/05/26 07:01:31 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information 18/05/26 07:01:31 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up 18/05/26 07:01:31 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-0a7ed5a0-688d-4684-8962-9f9c398dc979 18/05/26 07:01:31 INFO MemoryStore: MemoryStore started with capacity 366.3 MB 18/05/26 07:01:31 INFO SparkEnv: Registering OutputCommitCoordinator 18/05/26 07:01:32 INFO Utils: Successfully started service 'SparkUI' on port 4040. 18/05/26 07:01:32 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.17.177:4040 18/05/26 07:01:32 INFO SparkContext: Added JAR file:/root/Downloads/spark-streaming-kafka-0-10_2.10-2.1.0.jar at spark://192.168.17.177:44017/jars/spark-streaming-kafka-0-10_2.10-2.1.0.jar with timestamp 1527332492071 18/05/26 07:01:32 INFO SparkContext: Added JAR file:/root/streamDM/scripts/../target/scala-2.10/streamdm-spark-streaming-_2.10-0.2.jar at spark://192.168.17.177:44017/jars/streamdm-spark-streaming-_2.10-0.2.jar with timestamp 1527332492072 18/05/26 07:01:32 INFO Executor: Starting executor ID driver on host localhost 18/05/26 07:01:32 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 45190. 18/05/26 07:01:32 INFO NettyBlockTransferService: Server created on 192.168.17.177:45190 18/05/26 07:01:32 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy 18/05/26 07:01:32 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.17.177, 45190, None) 18/05/26 07:01:32 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.17.177:45190 with 366.3 MB RAM, BlockManagerId(driver, 192.168.17.177, 45190, None) 18/05/26 07:01:32 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.17.177, 45190, None) 18/05/26 07:01:32 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.17.177, 45190, None) Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka/KafkaUtils$ at org.apache.spark.streamdm.streams.KafkaReader.getExamples(KafkaReader.scala:62) at org.apache.spark.streamdm.tasks.EvaluatePrequential.run(EvaluatePrequential.scala:71) at org.apache.spark.streamdm.streamDMJob$.main(streamDMJob.scala:56) at org.apache.spark.streamdm.streamDMJob.main(streamDMJob.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtils$ at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 13 more 18/05/26 07:01:32 INFO SparkContext: Invoking stop() from shutdown hook 18/05/26 07:01:32 INFO SparkUI: Stopped Spark web UI at http://192.168.17.177:4040 18/05/26 07:01:32 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 18/05/26 07:01:32 INFO MemoryStore: MemoryStore cleared 18/05/26 07:01:32 INFO BlockManager: BlockManager stopped 18/05/26 07:01:32 INFO BlockManagerMaster: BlockManagerMaster stopped 18/05/26 07:01:32 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 18/05/26 07:01:32 INFO SparkContext: Successfully stopped SparkContext 18/05/26 07:01:32 INFO ShutdownHookManager: Shutdown hook called 18/05/26 07:01:32 INFO ShutdownHookManager: Deleting directory /tmp/spark-23e4273b-e33d-4dbf-ace4-72597addd9b6

Infrastructure details

  • **Java Version: 1.8.0_171
  • **Scala Version:2.10
  • **Spark version:2.1.0
  • **OS version: CentOS7
  • **Cluster mode or local mode? local
  • **Kafka Version: 0.10.2

also I imported org.apache.spark:spark-streaming-kafka-0-10_2.10:2.1.0 in my terminal using --packages and in my ~/.bashrc using an export.
I didn't work and I changed my spark.sh, in scripts directory to:
$SPARK_HOME/bin/spark-submit \ --jars /root/Downloads/spark-streaming-kafka-0-10_2.10-2.1.0.jar \ --class "org.apache.spark.streamdm.streamDMJob" \ --master local[2] \ ../target/scala-2.10/streamdm-spark-streaming-_2.10-0.2.jar \ $1
but the same error appears everytime.
can any one help me on this?

Update Evaluation Metrics

Better support for multi-class evaluation (e.g. micro and macro average precision, recall, ...)
Addition of other Stream evaluation methods

streamKM counter problem

I am new in this kind of programming but I think streamKM has an issue since train method has a numInstances counter inside a foreachRDD. The value of numInstances is not stored and as a result getClusters method does not work properly. Any thoughts?

/** 
  *  Maintain the BucketManager for coreset extraction, given an input DStream of Example.
  * @param input a stream of instances
  */
 def train(input: DStream[Example]): Unit = {
   input.foreachRDD(rdd => {
     rdd.foreach(ex => {
       bucketmanager = bucketmanager.update(ex)
       numInstances += 1
     })
   })
 }

data generators

synthetic data generator enhancement:

  1. read/write interface
  2. RandomTreeGenerator enhancement
    3.RandomRBFgenerator
  3. concept drift generators

Documentation of Example

While reading the documentation of StreamDM I was wondering why the Example object has input and output attributes. I had to dig deeper and go through the code in order to understand that the output instance contains the labels.

Unless I didn't miss something, it would be a nice small improvement to the documentation. ;)

streamDM release

At the end of Dec, we will release a new version.
We should fix bugs and update documents.
The generators also should need to add documents.

test shell is bad

Exception in thread "main" java.lang.Exception: -l learner (default: SGDLearner)
Learner to use
-e evaluator (default: BasicClassificationEvaluator)
Evaluator to use
-s streamReader (default: org.apache.spark.streamdm.streams.generators.RandomTreeGenerator)
Stream reader to use
-w resultsWriter (default: PrintStreamWriter)
Stream writer to use
-h shouldPrintHeader
Whether or not to print the evaluator header on the output file

at com.github.javacliparser.ClassOption.cliStringToObject(ClassOption.java:167)
at org.apache.spark.streamdm.streamDMJob$.main(streamDMJob.scala:55)
at org.apache.spark.streamdm.streamDMJob.main(streamDMJob.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Caused by: java.lang.IllegalArgumentException: Expecting option, found: '–s (FileReader –k 100 –d 60 –f ../data/mydata)'.
at com.github.javacliparser.Options.setViaCLIString(Options.java:150)
at com.github.javacliparser.ClassOption.cliStringToObject(ClassOption.java:159)
... 11 more

Hoeffding Tree bug

The addonWeight will be lost between RDDs. If the RDD size is small than the numGrace, the Hoeffding Tree will not split for ever.

Unable to run example

Following the documentation here, I'm unable to run an example. The following error is repeated when running ./spark.sh or one of the other examples with further options.

org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 3.0 failed 1 times, most recent failure: Lost task 1.0 in stage 3.0 (TID 6, localhost): java.lang.ArrayIndexOutOfBoundsException: 4
at org.apache.spark.streamdm.core.DenseInstance.add(DenseInstance.scala:85)
at org.apache.spark.streamdm.core.DenseInstance.add(DenseInstance.scala:28)
at org.apache.spark.streamdm.classifiers.SGDLearner$$anonfun$train$1$$anonfun$3.apply(SGDLearner.scala:92)
at org.apache.spark.streamdm.classifiers.SGDLearner$$anonfun$train$1$$anonfun$3.apply(SGDLearner.scala:86)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144)
at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:144)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:144)
at org.apache.spark.InterruptibleIterator.foldLeft(InterruptibleIterator.scala:28)
at scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:201)
at org.apache.spark.InterruptibleIterator.aggregate(InterruptibleIterator.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$aggregate$1$$anonfun$21.apply(RDD.scala:1056)
at org.apache.spark.rdd.RDD$$anonfun$aggregate$1$$anonfun$21.apply(RDD.scala:1056)
at org.apache.spark.SparkContext$$anonfun$36.apply(SparkContext.scala:1803)
at org.apache.spark.SparkContext$$anonfun$36.apply(SparkContext.scala:1803)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Java version: 1.8.0_20
Scala version: 2.11.8
Spark version: 1.5.1

can't run hoeffding tree

hi, i am newbie to this lib, I have installed scala, my java version is 8 and now, after compiling the lib, i enter the bellow command:
./spark.sh "200 EvaluatePrequential -l (meta.Bagging -l trees.HoeffdingTree) -s (FileReader -f covtypeNorm.arff -k 5810 -d 10 -i 581012) -e (BasicClassificationEvaluator -c -m) -h" 1> result_cov.txt 2> log_cov.log

and the result in log file is:
Exception in thread "main" java.lang.AbstractMethodError
at org.apache.spark.internal.Logging$class.initializeLogIfNecessary(Logging.scala:99)
at org.apache.spark.streamdm.streams.FileReader.initializeLogIfNecessary(FileReader.scala:46)
at org.apache.spark.internal.Logging$class.log(Logging.scala:46)
at org.apache.spark.streamdm.streams.FileReader.log(FileReader.scala:46)
at org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
at org.apache.spark.streamdm.streams.FileReader.logInfo(FileReader.scala:46)
at org.apache.spark.streamdm.streams.FileReader$$anonfun$init$1.apply$mcVI$sp(FileReader.scala:93)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
at org.apache.spark.streamdm.streams.FileReader.init(FileReader.scala:92)
at org.apache.spark.streamdm.streams.FileReader.getExampleSpecification(FileReader.scala:106)
at org.apache.spark.streamdm.tasks.EvaluatePrequential.run(EvaluatePrequential.scala:64)
at org.apache.spark.streamdm.streamDMJob$.main(streamDMJob.scala:56)
at org.apache.spark.streamdm.streamDMJob.main(streamDMJob.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:879)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:197)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:227)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Infrastructure details

  • **Java Version:8
  • **Scala Version: 2.13.0-M1
  • **Spark version:2.3
  • **OS version:centOS 7
  • **Cluster mode or local mode? local

Bug Report in TreeCoreset-StreamKM-BucketManager

Bug Report TreeCoreset-StreamKM-BucketManager

Expected behavior

A TreeCoreset contains the methods to extract an m-sized array of Examples from an n-sized stream of Examples where n>m.

Observed behavior

The extracted coreset contains many duplicate Examples with weight = 0.0

I used a custom file as input since some of the readers have issues. I attached the file below.

Command line

./spark.sh "ClusteringTrainEvaluate -c (StreamKM -s 100 -w 1000) -s (SocketTextStreamReader)" 1> result.txt 2> log_streamKM.log

To reproduce the issue several changes needs to be done.

  1. In bucketManager method getCoreset needs to change, because break obviously does not work so I used an "isFound" flag instead (any other suggestions instead of a flag are welcomed)
    def getCoreset: Array[Example] = {
    if(buckets(L-1).isFull) {
    buckets(L-1).points.toArray
    }else {
    var i = 0
    var coreset = Array[Example]()
    for(i <- 0 until L) {
    if(buckets(i).isFull) {
    coreset = buckets(i).points.toArray
    break
    }
    }
    val start = i+1
    for(j <- start until L) {
    val examples = buckets(j).points.toArray ++ coreset
    val tree = new TreeCoreset
    coreset = tree.retrieveCoreset(tree.buildCoresetTree(examples, maxsize),
    new Array[Example](0))
    }
    coreset
    }
    }
    }

  2. You have to print the output of the coreset in StreamKM using Example.toString and Example.weight to see that some Examples are changing their weight values to 0.0 even if 1.0 is the default value. (Thats probably happening in line 187 of TreeCoreset.scala where the leaf node has no elements so e.n is zero)

  3. I did a pull request in StreamKM but the issues are still there. Since method getCoreset is called every time a new Example input comes (a really slow process.) I checked moa's version in java (line 128) and there is a comment about calling getCoreset after the streaming process is finished. So in spark I'am guessing we should call it when numInstances are equals to rdd.count divided with the number of workers or the repartition of the input.

  4. in TreeCoreset.scala line 86 funCost has sometimes zero value so when costOfPoint is divided by funcost sum is NaN

Any thoughts on how to solve the problem with the duplicates? Thanks in advance.

txt file with 1000 randomly chosen points
random2.txt

Infrastructure details

Java Version: 1.8.0_144
Scala Version: 2.11.6
Spark version: 2.2.0
OS version: ubuntu 16.4
Spark Standalone cluster mode

Hoeffding Tree on spark cluster

Sir,
My M.tect project on Big data Anlaytics . And I am planning to
implement Hoeffding Tree on spark. As I seen your documentation, and I
wan't to ask how to should I implement StreamDm (Hoeffding Tree) in my
scala code (implemented using spark notebook) is there any .jar file should
I export on spark cluster. Your help will make my project on way
For your kind help I will always thankful to
you.
Thanking in anticipation

Runtime estimation in BasicClassificationEvaluator

Description

Include a runtime estimation for each batch in BasicClassificationEvaluator

Resources

This should be similar to the MOA output for evaluation tasks, i.e., output the runtime in seconds in a separate column.

Bug Report StreamKM Clusterer

Bug Report StreamKM Clusterer

Expected behavior

StreamKM should be keeping an up to date coreset tree, while doing kmeans clustering and assigning each input element to its nearest center.

Observed behavior

All the input assigned in one cluster. The counter of instances, the updated bucketmanager and the variable clusters are keeping their values only inside the foreachRDD action. So, when we are calling the assign function, there aren't any data to proceed.

Steps to reproduce the issue

used the iris.arff
Command line
./spark.sh "ClusteringTrainEvaluate -c (StreamKM) -s (SocketTextStreamReader)" 1> result_iris_streamKM.txt 2> log_iris_streamKM.log

There isn't an error message but if you print the output of the assign function (clpairs) each element is assigned to cluster's index 0

Infrastructure details

  • Java Version: 1.8.0_144
  • Scala Version: 2.11.6
  • Spark version: 2.2.0
  • OS version: ubuntu 16.4
  • Spark Standalone cluster mode

Documentation

We need to do a proper pass over the API and website documentation:

  1. Every non-private method implemented needs a description made, by using JavaDoc syntax.
  2. The website needs to have a Quick Start Guide, Programming Guide and update of previous documents

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.