Comments (26)
Hi Dimitar,
About item 3, I did a small fix some time ago following your comments. The reason that LuceneRDD consumed 17GB of memory was because on.cache()
I was also cache all the elements of the RDD in memory (now it is stored only on disk).
See 4e833e8
Thank you for reporting the issues!Any chance you can also do a release with this change that is compatible with Spark 2.1?
I just make a release with version 0.3.5
including the above change. It should be available in maven central in a few hours.
from spark-lucenerdd.
Found an error on my side. Not valid anymore, sorry about that.
from spark-lucenerdd.
A few suggestions:
- Why do you need to
coalesce(1)
your DataFrame? - Can you try to use only a few fields out of the 5 fields? Note all field will be analyzed.
- Check out also this example: https://github.com/zouzias/spark-lucenerdd/blob/master/src/main/scala/org/zouzias/spark/lucenerdd/LuceneRDD.scala#L543
- Do you have access to a cluster where you can run the same above code? The above setup sounds quite I/O expensive, so you will need to wait for minutes (hours?).
Use df.sample()
to debug your approach and then gradually alllow more data into LuceneRDD. This might help to see how you scale as well.
from spark-lucenerdd.
Hi zouzias,
I reopen this issue as I am facing again problems.
- Coalesce(1) due to TFIDF. It's really important or me.
- I need all 5 fields for queries.
- I will check it
- I have access to the cluster, but I don't have problem with IO wait I am expiriencing out of memory issues.
My problem is when performing the linking part in LuceneRDD.scala line 233. When I try to link 700 000 records RDD to a 6 000 000 million LuceneRDD it just hangs at some moment. With fewer records to link (tens of thousands) it is okay.
Is it normal for 6 000 000 milion records to have a 16GB LuceneRDD?
Thanks,
Dimitar
from spark-lucenerdd.
- Have you set
store.mode = "disk"
in your configuration? This ensures that the index is stored in disk (you still need to cache your RDD so that indexing happens only once).
https://github.com/zouzias/spark-lucenerdd/blob/master/src/main/resources/reference.conf#L30
- Do you need all fields to be "analyzed"?
Currently, the library analyzed all fields, but I can make a diff to analyze only a selected set of fields. Let me know if this is the case.
Also, you may want to check the following options in the cofinguration. If you are OK with DOCS_AND_FREQS
this might reduce the index size.
// Text fields options as in org.apache.lucene.index.IndexOptions
//
// Other options are:
// "DOCS"
// "DOCS_AND_FREQS"
// "DOCS_AND_FREQS_AND_POSITIONS"
// "DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS"
// "NONE"
options = "docs_and_freqs_and_positions_and_offsets"
// Omit terms norms
terms.omitnorms = false
// Store term positions
terms.positions = false
// Store Term vectors (set true, otherwise LuceneRDD.termVectors(fieldName) will fail)
terms.vectors = true
- Do you run out of memory or your application runs for a long time?
Notice that linkage tasks might take many hours, since you use only 1 partition.
"Is it normal for 6 000 000 milion records to have a 16GB LuceneRDD?"
Yes, JVM based applications like this one are quite memory hungry.
from spark-lucenerdd.
Thanks a lot for the prompt reply.
- I will try with store mode disk. Maybe I am missing something, but could you give me a hint how to setup the configuration? I din't find any documentation on this. Thanks!
- Actually no I need 3 out of 5 fields to be analyzed but I need the others for id in order to join latter other info. If you can make such a diff that would be nice.
- My application runs for a long time and the executor gets disconnected from the driver at the end.
from spark-lucenerdd.
- You can check how travis is setup with environmental variables during assembly
https://github.com/zouzias/spark-lucenerdd/blob/master/.travis.yml
In a nutshell, you need to setup the environmental variable, say -Dlucenerdd.analyzer.name=XXX
-
I will look at this
-
I see, can you share the logs here?
from spark-lucenerdd.
- Thanks worked like a charm now store.mode is disk
By the way on my lucene RDD:val luceneRdd = LuceneRDD(rdd.coalesce(1))
If Iexecutre luceneRdd.cache
everything is okay.
But if I executeluceneRdd.persist(StorageMode=MEMORY_AND_DISK)
The following happens (not sure if this is expected and caching should be controledby the store.mode parameter or really a bug that's why I am not raising a new issue):
18/11/12 21:55:45 ERROR TaskSetManager: Task 0.0 in stage 114.0 (TID 1649) had a not serializable result: org.apache.lucene.facet.FacetsConfig
Serialization stack:
- object not serializable (class: org.apache.lucene.facet.FacetsConfig, value: org.apache.lucene.facet.FacetsConfig@6ecbc8fb)
- field (class: org.zouzias.spark.lucenerdd.partition.LuceneRDDPartition, name: FacetsConfig, type: class org.apache.lucene.facet.FacetsConfig)
- object (class org.zouzias.spark.lucenerdd.partition.LuceneRDDPartition, org.zouzias.spark.lucenerdd.partition.LuceneRDDPartition@51f3cd43); not retrying
- Logs coming up in next comment.
from spark-lucenerdd.
Log from driver:
18/11/13 14:38:59 INFO DAGScheduler: Submitting 27 missing tasks from ShuffleMapStage 101 (MapPartitionsRDD[39] at persist at CompanyMatcher.scala:34) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14))
18/11/13 14:38:59 INFO TaskSchedulerImpl: Adding task set 101.0 with 27 tasks
18/11/13 14:39:00 WARN TransportChannelHandler: Exception in connection from /192.168.129.50:59774
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1106)
at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:343)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
at java.lang.Thread.run(Thread.java:748)
18/11/13 14:39:00 ERROR TaskSchedulerImpl: Lost executor 0 on 192.168.129.50: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
18/11/13 14:39:00 WARN TaskSetManager: Lost task 0.0 in stage 168.0 (TID 5132, 192.168.129.50, executor 0): ExecutorLostFailure (executor 0 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
18/11/13 14:39:00 WARN TaskSetManager: Lost task 0.0 in stage 103.1 (TID 6055, 192.168.129.50, executor 0): ExecutorLostFailure (executor 0 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
18/11/13 14:39:00 WARN TaskSetManager: Lost task 3.0 in stage 103.1 (TID 6058, 192.168.129.50, executor 0): ExecutorLostFailure (executor 0 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
18/11/13 14:39:00 WARN TaskSetManager: Lost task 6.0 in stage 103.1 (TID 6061, 192.168.129.50, executor 0): ExecutorLostFailure (executor 0 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
18/11/13 14:39:00 WARN TaskSetManager: Lost task 5.0 in stage 103.1 (TID 6060, 192.168.129.50, executor 0): ExecutorLostFailure (executor 0 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
18/11/13 14:39:00 WARN TaskSetManager: Lost task 2.0 in stage 103.1 (TID 6057, 192.168.129.50, executor 0): ExecutorLostFailure (executor 0 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
18/11/13 14:39:00 WARN TaskSetManager: Lost task 1.0 in stage 103.1 (TID 6056, 192.168.129.50, executor 0): ExecutorLostFailure (executor 0 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
18/11/13 14:39:00 WARN TaskSetManager: Lost task 4.0 in stage 103.1 (TID 6059, 192.168.129.50, executor 0): ExecutorLostFailure (executor 0 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
18/11/13 14:39:00 INFO DAGScheduler: Executor lost: 0 (epoch 182)
18/11/13 14:39:00 INFO BlockManagerMasterEndpoint: Trying to remove executor 0 from BlockManagerMaster.
18/11/13 14:39:00 INFO BlockManagerMaster: Removed 0 successfully in removeExecutor
Log from stderr of executor:
18/11/13 14:36:06 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
18/11/13 14:36:06 INFO LuceneRDDPartition: [partId=0]Indexing process completed at 2018-11-13T14:36:06.255Z...
18/11/13 14:36:06 INFO LuceneRDDPartition: [partId=0]Indexing process took 346 seconds...
18/11/13 14:36:46 INFO LuceneRDDPartition: [partId=0]Indexed 6277810 documents
18/11/13 14:38:58 WARN BlockManager: Block rdd_618_0 could not be removed as it was not found on disk or in memory
18/11/13 14:38:58 ERROR Executor: Exception in task 0.0 in stage 170.0 (TID 5133)
java.lang.OutOfMemoryError: Java heap space
at java.util.IdentityHashMap.resize(IdentityHashMap.java:472)
at java.util.IdentityHashMap.put(IdentityHashMap.java:441)
at org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:174)
at org.apache.spark.util.SizeEstimator$.visitArray(SizeEstimator.scala:252)
at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:209)
at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:201)
at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:69)
at org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78)
at org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70)
at org.apache.spark.util.collection.SizeTrackingVector.$plus$eq(SizeTrackingVector.scala:31)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:217)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1092)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1083)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1018)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1083)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:809)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
18/11/13 14:38:58 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker for task 5133,5,main]
java.lang.OutOfMemoryError: Java heap space
at java.util.IdentityHashMap.resize(IdentityHashMap.java:472)
at java.util.IdentityHashMap.put(IdentityHashMap.java:441)
at org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:174)
at org.apache.spark.util.SizeEstimator$.visitArray(SizeEstimator.scala:252)
at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:209)
at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:201)
at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:69)
at org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78)
at org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70)
at org.apache.spark.util.collection.SizeTrackingVector.$plus$eq(SizeTrackingVector.scala:31)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:217)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1092)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1083)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1018)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1083)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:809)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
18/11/13 14:38:58 INFO DiskBlockManager: Shutdown hook called
from spark-lucenerdd.
I've managed to isolate only the Lucene part in order to exclude other possible issues. So during the .link part. with 6.2M records for LuceneRdd and 700 000 records for the other RDD. I receive GcOverhead with standard GC and 50gb for executor.
18/11/14 11:18:56 WARN TaskSetManager: Lost task 0.0 in stage 10.0 (TID 163, 192.168.129.50, executor 0): java.lang.OutOfMemoryError: GC overhead limit exceeded
at org.apache.lucene.codecs.lucene50.Lucene50PostingsReader.newTermState(Lucene50PostingsReader.java:145)
at org.apache.lucene.codecs.blocktree.SegmentTermsEnumFrame.<init>(SegmentTermsEnumFrame.java:98)
at org.apache.lucene.codecs.blocktree.SegmentTermsEnum.getFrame(SegmentTermsEnum.java:215)
at org.apache.lucene.codecs.blocktree.SegmentTermsEnum.pushFrame(SegmentTermsEnum.java:241)
at org.apache.lucene.codecs.blocktree.SegmentTermsEnum.seekExact(SegmentTermsEnum.java:471)
at org.apache.lucene.index.TermContext.build(TermContext.java:99)
at org.apache.lucene.search.TermQuery.createWeight(TermQuery.java:211)
at org.apache.lucene.search.IndexSearcher.createWeight(IndexSearcher.java:743)
at org.apache.lucene.search.BooleanWeight.<init>(BooleanWeight.java:54)
at org.apache.lucene.search.BooleanQuery.createWeight(BooleanQuery.java:207)
at org.apache.lucene.search.IndexSearcher.createWeight(IndexSearcher.java:743)
at org.apache.lucene.search.BooleanWeight.<init>(BooleanWeight.java:54)
at org.apache.lucene.search.BooleanQuery.createWeight(BooleanQuery.java:207)
at org.apache.lucene.search.IndexSearcher.createWeight(IndexSearcher.java:743)
at org.apache.lucene.search.BooleanWeight.<init>(BooleanWeight.java:54)
at org.apache.lucene.search.BooleanQuery.createWeight(BooleanQuery.java:207)
at org.apache.lucene.search.IndexSearcher.createWeight(IndexSearcher.java:743)
at org.apache.lucene.search.IndexSearcher.search(IndexSearcher.java:463)
at org.apache.lucene.search.IndexSearcher.search(IndexSearcher.java:584)
at org.apache.lucene.search.IndexSearcher.searchAfter(IndexSearcher.java:439)
at org.apache.lucene.search.IndexSearcher.search(IndexSearcher.java:450)
at org.zouzias.spark.lucenerdd.query.LuceneQueryHelpers$.searchParser(LuceneQueryHelpers.scala:107)
at org.zouzias.spark.lucenerdd.partition.LuceneRDDPartition.query(LuceneRDDPartition.scala:135)
at org.zouzias.spark.lucenerdd.LuceneRDD$$anonfun$5$$anonfun$apply$2$$anonfun$apply$3.apply(LuceneRDD.scala:236)
at org.zouzias.spark.lucenerdd.LuceneRDD$$anonfun$5$$anonfun$apply$2$$anonfun$apply$3.apply(LuceneRDD.scala:235)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at org.zouzias.spark.lucenerdd.LuceneRDD$$anonfun$5$$anonfun$apply$2.apply(LuceneRDD.scala:235)
from spark-lucenerdd.
I've managed to isolate only the Lucene part in order to exclude other possible issues. So during the .link part. with 6.2M records for LuceneRdd and 700 000 records for the other RDD. I receive GcOverhead with standard GC and 50gb for executor.
18/11/14 11:18:56 WARN TaskSetManager: Lost task 0.0 in stage 10.0 (TID 163, 192.168.129.50, executor 0): java.lang.OutOfMemoryError: GC overhead limit exceeded at org.apache.lucene.codecs.lucene50.Lucene50PostingsReader.newTermState(Lucene50PostingsReader.java:145) at org.apache.lucene.codecs.blocktree.SegmentTermsEnumFrame.<init>(SegmentTermsEnumFrame.java:98) at org.apache.lucene.codecs.blocktree.SegmentTermsEnum.getFrame(SegmentTermsEnum.java:215) at org.apache.lucene.codecs.blocktree.SegmentTermsEnum.pushFrame(SegmentTermsEnum.java:241) at org.apache.lucene.codecs.blocktree.SegmentTermsEnum.seekExact(SegmentTermsEnum.java:471) at org.apache.lucene.index.TermContext.build(TermContext.java:99) at org.apache.lucene.search.TermQuery.createWeight(TermQuery.java:211) at org.apache.lucene.search.IndexSearcher.createWeight(IndexSearcher.java:743) at org.apache.lucene.search.BooleanWeight.<init>(BooleanWeight.java:54) at org.apache.lucene.search.BooleanQuery.createWeight(BooleanQuery.java:207) at org.apache.lucene.search.IndexSearcher.createWeight(IndexSearcher.java:743) at org.apache.lucene.search.BooleanWeight.<init>(BooleanWeight.java:54) at org.apache.lucene.search.BooleanQuery.createWeight(BooleanQuery.java:207) at org.apache.lucene.search.IndexSearcher.createWeight(IndexSearcher.java:743) at org.apache.lucene.search.BooleanWeight.<init>(BooleanWeight.java:54) at org.apache.lucene.search.BooleanQuery.createWeight(BooleanQuery.java:207) at org.apache.lucene.search.IndexSearcher.createWeight(IndexSearcher.java:743) at org.apache.lucene.search.IndexSearcher.search(IndexSearcher.java:463) at org.apache.lucene.search.IndexSearcher.search(IndexSearcher.java:584) at org.apache.lucene.search.IndexSearcher.searchAfter(IndexSearcher.java:439) at org.apache.lucene.search.IndexSearcher.search(IndexSearcher.java:450) at org.zouzias.spark.lucenerdd.query.LuceneQueryHelpers$.searchParser(LuceneQueryHelpers.scala:107) at org.zouzias.spark.lucenerdd.partition.LuceneRDDPartition.query(LuceneRDDPartition.scala:135) at org.zouzias.spark.lucenerdd.LuceneRDD$$anonfun$5$$anonfun$apply$2$$anonfun$apply$3.apply(LuceneRDD.scala:236) at org.zouzias.spark.lucenerdd.LuceneRDD$$anonfun$5$$anonfun$apply$2$$anonfun$apply$3.apply(LuceneRDD.scala:235) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at org.zouzias.spark.lucenerdd.LuceneRDD$$anonfun$5$$anonfun$apply$2.apply(LuceneRDD.scala:235)
Is this during index creation (after caching and running .link(...)
?
From a high level point of view, there are two points where you might run out of memory:
- During index creation, but this should not happen after storing the index on disk (
store.mode=disk
) - During the execution of the
.link
, since you will broadcast all queries RDD. To avoid this, you need to make sure that the driver has enough memory to collect the 700K queries.
Can you extract from the logs which case happens here?
from spark-lucenerdd.
Its in Lucene.RDD on line 233 I guess it's during the linking itself (execution of the queries).
With 30gb executor memory I get either GC Overhead or OOM. With 50gb for executor I get (I've set timeout to 1000 seconds):
18/11/14 12:17:04 WARN HeartbeatReceiver: Removing executor 0 with no recent heartbeats: 1007154 ms exceeds timeout 1000000 ms
18/11/14 12:17:04 ERROR TaskSchedulerImpl: Lost executor 0 on 192.168.129.50: Executor heartbeat timed out after 1007154 ms
I am running the same now on bigger machine with 200gb memory and will see what the result would be.
from spark-lucenerdd.
You need to increase your driver's memory, if you run OOM in line
Put 64gb of memory on the driver and executor as well, since you have a big fat machine.
Tip: You can try to see if you can link the first 100K queries, and if so, run your job in 7 batches with a lower memory footprint.
from spark-lucenerdd.
I pushed a diff that allows you to select a subset of column names that will not analyzed:
You can use this diff by setting the version to 0.3.4-SNAPSHOT
and adding the following line in your SBT
resolvers += "OSS Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots"
from spark-lucenerdd.
Thanks a lot Zouzias! It will be really useful in my case.
I have another question to you.
On the big machine with 200gb I managed to succeed with 180 for executor and 20 for driver. But I cannot allow to use such a big machine it was just for a test. I would try to squeeze in the 64gb machine.
Linking an Rdd with 80 000 records works just fine.
Then for the 700 000 RDD I tried to split it with 50 000 max record size but I get the following result:
I am doing the following:
val luceneRDD: LuceneRDD[Row] = LuceneRDD(
sourceToIndex,
TermAnalyzerName,
QueryAnalyzerName,
SimilarityAlgorithm
)
luceneRDD.cache
def tokenizeString: UserDefinedFunction =
udf((str: String) => {
if (str != null) {
val analyzer = new StandardAnalyzer(CharArraySet.EMPTY_SET)
val result: ListBuffer[String] = ListBuffer()
try {
val tokenStream = analyzer.tokenStream(null, new StringReader(str))
tokenStream.reset()
while (tokenStream.incrementToken()) {
result += tokenStream
.getAttribute(classOf[CharTermAttribute])
.toString
}
} catch {
case e: IOException => throw new RuntimeException()
}
if (result.isEmpty) {
null
} else {
result.map(_.replace(":", "\\:"))
}
} else {
null
}
})
val sourceToMatch = sourceDF
.withColumn(TokenizedNameFieldName, tokenizeString(col(NamesFieldName)))
.select(TokenizedNameFieldName)
.rdd
.map(_.getAs[Seq[String]](TokenizedNameFieldName))
def genereteLuceneScore(mode: String) = {
val sourceWithTokenizedNameField = sourceToMatch
val maxResultSize = args(1).toInt
var numSlices = (sourceToMatch.count() / 50000).ceil.toInt
numSlices = if (numSlices == 0) { 1 } else numSlices
val split = Array.fill(numSlices)(1d)
var result =
sparkSession.sparkContext.emptyRDD[(Seq[String], Array[SparkScoreDoc])]
for (source <- sourceWithTokenizedNameField.randomSplit(split)) {
if (mode == "broad") {
val a = generateLuceneBroadScores(source)
a.count
result = result.union(a)
} else if (mode == "precise") {
val a = generateLucenePreciseScores(source)
a.count
result = result.union(a)
}
}
result
}
def generateLuceneBroadScores(
source: RDD[Seq[String]]): RDD[(Seq[String], Array[SparkScoreDoc])] = {
val sourceWithTokenizedNameField = sourceToMatch
def generateBroadQuery(row: Seq[String]): String = {
val queries = new java.util.ArrayList[Query]()
val tokens: Seq[String] = row
require(tokens.nonEmpty, "Tokens should be of size greater then 0")
for (field <- Seq(NamesFieldName, DescriptionsFieldName)) {
var booleanQueryBuilder = new BooleanQuery.Builder()
for (token <- tokens) {
booleanQueryBuilder.add(new TermQuery(new Term(field, token)),
BooleanClause.Occur.SHOULD)
}
val booleanQuery: BooleanQuery = booleanQueryBuilder.build()
queries.add(booleanQuery)
}
val result =
new DisjunctionMaxQuery(queries, 1).toString().replace("~", "")
result
}
luceneRDD.link(sourceWithTokenizedNameField, generateBroadQuery, 20)
}
def generateLucenePreciseScores(
source: RDD[Seq[String]]): RDD[(Seq[String], Array[SparkScoreDoc])] = {
val sourceWithTokenizedNameField = sourceToMatch
def generatePreciseQuery(row: Seq[String]): String = {
val tokens: Seq[String] = row
require(tokens.nonEmpty, "Tokens should be of size greater then 0")
var booleanQueryBuilder = new BooleanQuery.Builder()
for (token <- tokens) {
booleanQueryBuilder.add(
new TermQuery(new Term(NamesFieldName, token)),
BooleanClause.Occur.SHOULD)
}
val result: String = (booleanQueryBuilder.build()).toString
result
}
luceneRDD.link(sourceWithTokenizedNameField, generatePreciseQuery, 20)
}
I am calling count on each iteration to force the generation of the DataFrame otherwise I guess the optimizer will combine the whole operation into one and I will again reach OOM. The funny thing is that even on the first iteration I get GC overhed and I get really a lot of GC time. While I am matching a single dataset with 80 000 records it goes okay. Do you have any idea why it behaves like that?
from spark-lucenerdd.
One more input that I find rather strange.
So with index 6 300 000 records and other RDD with 80 000 records to link and 1 partition for LuceneRDD everything goes okay.
With Index 6 300 000 records and other RDD with 700 000 records and various number of partitions for LuceneRDD I always end up with GC Overhead limit reached on line 233.
- It's not the driver memory I've verified that with enough executor memory that 20gb for the driver are sufficient
- Shouldn't using many partitions split the work into chunks and not consume so much memory at once?
Do you have any idea why neither having many partitions or splitting the input into equally sized chunks is working?
Thanks a lot,
Dimitar
from spark-lucenerdd.
Hi there,
Do you run your spark job on a cluster or local mode?
If you run it on local mode, then you whole spark job runs under a single JVM that consumes in total the amount of driver's plus executors memory.
Regarding (2), in standalone mode I am not sure how the memory management in the executors happen. As far as I remember, the executors run within the same JVM (as the driver) so the executors compete for memory.
from spark-lucenerdd.
Hi Zouzias I am running on Standalone cluster with 1 executor.
Yes they have to split driver and executor memory. I have verified that the driver does not need any more than 20gb for the process to complete successfully so 40gb are left for executor.
There is no more way to tune it I've tried everything.
By the way I've tried native Lucene.
- I collect all data of the RDD that we index to the driver and create an index in a local dir.
- I collect a single column that is used to make the queries to the driver.
- I construct all the queries.
- I am running them on an index that has been built in a local folder.
- I build a new RDD from the results.
This way I managed to run the linking on the same amount the of data.
This made me wonder why LuceneRDD has such a big overhead might be due to Spark? Anyway there are couple of questions that I could not answer myself that blocked me from executing my application with LuceneRDD. I don't seek an answer as I will use native Lucene, but they might help you think of ways to optimize the library as it's really useful library this one.
- Even with many partitions I get GC ovrehead when matching big DataSet to big Index
- Even when splitting the big DatSet to several chunks I get GC ovehead on the linking process.
- The LuceneRDD is 17GB while the index in native Lucene is 1.6GB this is 10 times larger.
Best regards,
Dimitar
from spark-lucenerdd.
Hi Dimitar,
About item 3, I did a small fix some time ago following your comments. The reason that LuceneRDD consumed 17GB of memory was because on .cache()
I was also cache all the elements of the RDD in memory (now it is stored only on disk).
See 4e833e8
Thank you for reporting the issues!
from spark-lucenerdd.
Hi Dimitar,
About item 3, I did a small fix some time ago following your comments. The reason that LuceneRDD consumed 17GB of memory was because on
.cache()
I was also cache all the elements of the RDD in memory (now it is stored only on disk).See 4e833e8
Thank you for reporting the issues!
Any chance you can also do a release with this change that is compatible with Spark 2.1?
from spark-lucenerdd.
Currently, the library analyzed all fields, but I can make a diff to analyze only a selected set of fields. Let me know if this is the case.
Did you get to implement this?
from spark-lucenerdd.
from spark-lucenerdd.
I am not sure why but I am having this issue again
20/02/12 14:31:30 WARN TaskSetManager: Lost task 10549.0 in stage 6.0 (TID 11936, executor 186): ExecutorLostFailure (executor 186 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 27.7 GB of 27.5 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
This is the configuration that I have :
lucenerdd {
linker.method = "cartesian"
index {
store.mode = "disk"
}
}
20/02/12 17:59:07 INFO LuceneRDDPartition: Config parameter lucenerdd.index.store.mode is set to 'disk'
20/02/12 17:59:07 INFO LuceneRDDPartition: Lucene index will be storage in disk
I am not sure what could be causing OOM when I am working using the disk
from spark-lucenerdd.
The line that seems to be causing this issue is flatMap at [LuceneRDD.scala:222]
It fails as a domino effect until the job fails.
My cluster is relatively big , so not sure why this is happening
I am trying to self join a dataframe with 95 million records. Worked before for some reason , so not sure why it is failing now
from spark-lucenerdd.
Do you have any idea @zouzias
from spark-lucenerdd.
from spark-lucenerdd.
Related Issues (20)
- Exception "requirement failed: TopK requires at least K>0" with collect when no results are found HOT 1
- Store query that links the documents HOT 3
- Improve test coverage on Lucene Analyzers per field
- Remove dependency on sbt-spark-package HOT 2
- Support Scala 2.12 HOT 4
- Update to SBT 1.x HOT 1
- [Implicits] Support MapType for Spark DataFrames
- [question]want to knnSearch on for every record from other data frame - HOT 1
- Label Entity Linkage tasks using `sc.setJobGroup`
- Improve logging HOT 2
- Why is indexing entering a loop? HOT 4
- Weird results when running it distributed vs local HOT 5
- Help debugging a OOM issue when the search population increases HOT 3
- Question about blockdedup and call to count()
- Typesafe config is generating the error UTFDataFormatException: encoded string too long HOT 2
- Serialization Issue with org.apache.lucene.facet.FacetsConfig HOT 4
- RDD is removing null columns on fuzzy linking HOT 3
- How to search with lucenerdd in another queries' rdd? HOT 1
- Greatly improved linking performance
- Elasticsearch Snapshots? HOT 2
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from spark-lucenerdd.