Hi,
I am trying to run a Spark on YARN program provided by Spark in the examples directory using Amazon Kinesis on EMR cluster :
I've setup the Credentials
export AWS_ACCESS_KEY_ID=<ACCESS_KEY_ID>
export AWS_SECRET_KEY=<ACCESS_SECRET_KEY>
A) This is the Kinesis Word Count Producer which ran Successfully :
run-example org.apache.spark.examples.streaming.KinesisWordCountProducerASL mySparkStream https://kinesis.us-east-1.amazonaws.com 1 5
Sample Logs :
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Putting records onto stream mySparkStream and endpoint https://kinesis.us-east-1.amazonaws.com at a rate of 1 records per second and 5 words per record
Sent 1 records
Sent 1 records
Sent 1 records
Sent 1 records
Sent 1 records
Totals
(0,6)
(1,2)
(2,3)
(3,2)
(4,2)
(5,4)
(6,1)
(7,1)
(8,1)
(9,3)
B) This one is the Normal Consumer using Spark Streaming which is also working :
run-example org.apache.spark.examples.streaming.JavaKinesisWordCountASL mySparkStream https://kinesis.us-east-1.amazonaws.com
15/03/25 11:52:24 INFO storage.BlockManagerMaster: Updated info of block broadcast_30_piece0
15/03/25 11:52:24 INFO storage.BlockManager: Removing block broadcast_30
15/03/25 11:52:24 INFO storage.MemoryStore: Block broadcast_30 of size 2296 dropped from memory (free 278229292)
15/03/25 11:52:24 INFO storage.BlockManagerInfo: Added broadcast_48_piece0 in memory on localhost:52341 (size: 1447.0 B, free: 265.4 MB)
15/03/25 11:52:24 INFO spark.ContextCleaner: Cleaned broadcast 30
15/03/25 11:52:24 INFO storage.BlockManagerMaster: Updated info of block broadcast_48_piece0
15/03/25 11:52:24 INFO spark.ContextCleaner: Cleaned shuffle 14
15/03/25 11:52:24 INFO spark.ContextCleaner: Cleaned shuffle 13
15/03/25 11:52:24 INFO spark.SparkContext: Created broadcast 48 from broadcast at DAGScheduler.scala:839
15/03/25 11:52:24 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from Stage 92 (ShuffledRDD[92] at reduceByKey at JavaKinesisWordCountASL.java:159)
15/03/25 11:52:24 INFO scheduler.TaskSchedulerImpl: Adding task set 92.0 with 1 tasks
15/03/25 11:52:24 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 92.0 (TID 50, localhost, PROCESS_LOCAL, 1133 bytes)
15/03/25 11:52:24 INFO executor.Executor: Running task 0.0 in stage 92.0 (TID 50)
15/03/25 11:52:24 INFO storage.ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks
15/03/25 11:52:24 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
15/03/25 11:52:24 INFO executor.Executor: Finished task 0.0 in stage 92.0 (TID 50). 1068 bytes result sent to driver
15/03/25 11:52:24 INFO scheduler.DAGScheduler: Stage 92 (print at JavaKinesisWordCountASL.java:173) finished in 0.020 s
15/03/25 11:52:24 INFO scheduler.DAGScheduler: Job 46 finished: print at JavaKinesisWordCountASL.java:173, took 0.042294 s
Time: 1427284344000 ms
(4,1)
(0,2)
(2,1)
(7,1)
(5,3)
(9,2)
C) And this is the YARN based program which is not working :
run-example org.apache.spark.examples.streaming.JavaKinesisWordCountASLYARN mySparkStream https://kinesis.us-east-1.amazonaws.com\
Spark assembly has been built with Hive, including Datanucleus jars on classpath
15/03/25 11:52:45 INFO spark.SparkContext: Running Spark version 1.3.0
15/03/25 11:52:45 WARN spark.SparkConf:
SPARK_CLASSPATH was detected (set to '/home/hadoop/spark/conf:/home/hadoop/conf:/home/hadoop/spark/classpath/emr/:/home/hadoop/spark/classpath/emrfs/:/home/hadoop/share/hadoop/common/lib/*:/home/hadoop/share/hadoop/common/lib/hadoop-lzo.jar').
This is deprecated in Spark 1.0+.
Please instead use:
- ./spark-submit with --driver-class-path to augment the driver classpath
- spark.executor.extraClassPath to augment the executor classpath
15/03/25 11:52:45 WARN spark.SparkConf: Setting 'spark.executor.extraClassPath' to '/home/hadoop/spark/conf:/home/hadoop/conf:/home/hadoop/spark/classpath/emr/:/home/hadoop/spark/classpath/emrfs/:/home/hadoop/share/hadoop/common/lib/:/home/hadoop/share/hadoop/common/lib/hadoop-lzo.jar' as a work-around.
15/03/25 11:52:45 WARN spark.SparkConf: Setting 'spark.driver.extraClassPath' to '/home/hadoop/spark/conf:/home/hadoop/conf:/home/hadoop/spark/classpath/emr/:/home/hadoop/spark/classpath/emrfs/:/home/hadoop/share/hadoop/common/lib/:/home/hadoop/share/hadoop/common/lib/hadoop-lzo.jar' as a work-around.
15/03/25 11:52:46 INFO spark.SecurityManager: Changing view acls to: hadoop
15/03/25 11:52:46 INFO spark.SecurityManager: Changing modify acls to: hadoop
15/03/25 11:52:46 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)
15/03/25 11:52:47 INFO slf4j.Slf4jLogger: Slf4jLogger started
15/03/25 11:52:48 INFO Remoting: Starting remoting
15/03/25 11:52:48 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:59504]
15/03/25 11:52:48 INFO util.Utils: Successfully started service 'sparkDriver' on port 59504.
15/03/25 11:52:48 INFO spark.SparkEnv: Registering MapOutputTracker
15/03/25 11:52:48 INFO spark.SparkEnv: Registering BlockManagerMaster
15/03/25 11:52:48 INFO storage.DiskBlockManager: Created local directory at /mnt/spark/spark-120befbc-6dae-4751-b41f-dbf7b3d97616/blockmgr-d339d180-36f5-465f-bda3-cecccb23b1d3
15/03/25 11:52:48 INFO storage.MemoryStore: MemoryStore started with capacity 265.4 MB
15/03/25 11:52:48 INFO spark.HttpFileServer: HTTP File server directory is /mnt/spark/spark-85e88478-3dad-4fcf-a43a-efd15166bef3/httpd-6115870a-0d90-44df-aa7c-a6bd1a47e107
15/03/25 11:52:48 INFO spark.HttpServer: Starting HTTP Server
15/03/25 11:52:49 INFO server.Server: jetty-8.y.z-SNAPSHOT
15/03/25 11:52:49 INFO server.AbstractConnector: Started [email protected]:44879
15/03/25 11:52:49 INFO util.Utils: Successfully started service 'HTTP file server' on port 44879.
15/03/25 11:52:49 INFO spark.SparkEnv: Registering OutputCommitCoordinator
15/03/25 11:52:49 INFO server.Server: jetty-8.y.z-SNAPSHOT
15/03/25 11:52:49 INFO server.AbstractConnector: Started [email protected]:4040
15/03/25 11:52:49 INFO util.Utils: Successfully started service 'SparkUI' on port 4040.
15/03/25 11:52:49 INFO ui.SparkUI: Started SparkUI at http://ip-10-80-175-92.ec2.internal:4040
15/03/25 11:52:50 INFO spark.SparkContext: Added JAR file:/home/hadoop/spark/lib/spark-examples-1.3.0-hadoop2.4.0.jar at http://10.80.175.92:44879/jars/spark-examples-1.3.0-hadoop2.4.0.jar with timestamp 1427284370358
15/03/25 11:52:50 INFO cluster.YarnClusterScheduler: Created YarnClusterScheduler
15/03/25 11:52:51 ERROR cluster.YarnClusterSchedulerBackend: Application ID is not set.
15/03/25 11:52:51 INFO netty.NettyBlockTransferService: Server created on 49982
15/03/25 11:52:51 INFO storage.BlockManagerMaster: Trying to register BlockManager
15/03/25 11:52:51 INFO storage.BlockManagerMasterActor: Registering block manager ip-10-80-175-92.ec2.internal:49982 with 265.4 MB RAM, BlockManagerId(, ip-10-80-175-92.ec2.internal, 49982)
15/03/25 11:52:51 INFO storage.BlockManagerMaster: Registered BlockManager
Exception in thread "main" java.lang.NullPointerException
at org.apache.spark.deploy.yarn.ApplicationMaster$.sparkContextInitialized(ApplicationMaster.scala:581)
at org.apache.spark.scheduler.cluster.YarnClusterScheduler.postStartHook(YarnClusterScheduler.scala:32)
at org.apache.spark.SparkContext.(SparkContext.scala:541)
at org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:642)
at org.apache.spark.streaming.StreamingContext.(StreamingContext.scala:75)
at org.apache.spark.streaming.api.java.JavaStreamingContext.(JavaStreamingContext.scala:132)
at org.apache.spark.examples.streaming.JavaKinesisWordCountASLYARN.main(JavaKinesisWordCountASLYARN.java:127)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)