Giter Club home page Giter Club logo

Comments (26)

zouzias avatar zouzias commented on May 20, 2024 1

Hi Dimitar,
About item 3, I did a small fix some time ago following your comments. The reason that LuceneRDD consumed 17GB of memory was because on .cache() I was also cache all the elements of the RDD in memory (now it is stored only on disk).
See 4e833e8
Thank you for reporting the issues!

Any chance you can also do a release with this change that is compatible with Spark 2.1?

I just make a release with version 0.3.5 including the above change. It should be available in maven central in a few hours.

from spark-lucenerdd.

stork-epitropov avatar stork-epitropov commented on May 20, 2024

Found an error on my side. Not valid anymore, sorry about that.

from spark-lucenerdd.

zouzias avatar zouzias commented on May 20, 2024

A few suggestions:

Use df.sample() to debug your approach and then gradually alllow more data into LuceneRDD. This might help to see how you scale as well.

from spark-lucenerdd.

stork-epitropov avatar stork-epitropov commented on May 20, 2024

Hi zouzias,

I reopen this issue as I am facing again problems.

  1. Coalesce(1) due to TFIDF. It's really important or me.
  2. I need all 5 fields for queries.
  3. I will check it
  4. I have access to the cluster, but I don't have problem with IO wait I am expiriencing out of memory issues.

My problem is when performing the linking part in LuceneRDD.scala line 233. When I try to link 700 000 records RDD to a 6 000 000 million LuceneRDD it just hangs at some moment. With fewer records to link (tens of thousands) it is okay.

Is it normal for 6 000 000 milion records to have a 16GB LuceneRDD?

Thanks,
Dimitar

from spark-lucenerdd.

zouzias avatar zouzias commented on May 20, 2024
  1. Have you set
store.mode = "disk"

in your configuration? This ensures that the index is stored in disk (you still need to cache your RDD so that indexing happens only once).

https://github.com/zouzias/spark-lucenerdd/blob/master/src/main/resources/reference.conf#L30

  1. Do you need all fields to be "analyzed"?

Currently, the library analyzed all fields, but I can make a diff to analyze only a selected set of fields. Let me know if this is the case.

Also, you may want to check the following options in the cofinguration. If you are OK with DOCS_AND_FREQS this might reduce the index size.

 // Text fields options as in org.apache.lucene.index.IndexOptions
      //
      // Other options are:
      // "DOCS"
      // "DOCS_AND_FREQS"
      // "DOCS_AND_FREQS_AND_POSITIONS"
      // "DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS"
      // "NONE"
      options = "docs_and_freqs_and_positions_and_offsets"

      // Omit terms norms
      terms.omitnorms = false

      // Store term positions
      terms.positions = false

      // Store Term vectors (set true, otherwise LuceneRDD.termVectors(fieldName) will fail)
terms.vectors = true
  1. Do you run out of memory or your application runs for a long time?

Notice that linkage tasks might take many hours, since you use only 1 partition.

"Is it normal for 6 000 000 milion records to have a 16GB LuceneRDD?"

Yes, JVM based applications like this one are quite memory hungry.

from spark-lucenerdd.

stork-epitropov avatar stork-epitropov commented on May 20, 2024

Thanks a lot for the prompt reply.

  1. I will try with store mode disk. Maybe I am missing something, but could you give me a hint how to setup the configuration? I din't find any documentation on this. Thanks!
  2. Actually no I need 3 out of 5 fields to be analyzed but I need the others for id in order to join latter other info. If you can make such a diff that would be nice.
  3. My application runs for a long time and the executor gets disconnected from the driver at the end.

from spark-lucenerdd.

zouzias avatar zouzias commented on May 20, 2024
  1. You can check how travis is setup with environmental variables during assembly
    https://github.com/zouzias/spark-lucenerdd/blob/master/.travis.yml

In a nutshell, you need to setup the environmental variable, say -Dlucenerdd.analyzer.name=XXX

  1. I will look at this

  2. I see, can you share the logs here?

from spark-lucenerdd.

stork-epitropov avatar stork-epitropov commented on May 20, 2024
  1. Thanks worked like a charm now store.mode is disk
    By the way on my lucene RDD: val luceneRdd = LuceneRDD(rdd.coalesce(1))
    If I executre luceneRdd.cache everything is okay.
    But if I execute luceneRdd.persist(StorageMode=MEMORY_AND_DISK) The following happens (not sure if this is expected and caching should be controledby the store.mode parameter or really a bug that's why I am not raising a new issue):
18/11/12 21:55:45 ERROR TaskSetManager: Task 0.0 in stage 114.0 (TID 1649) had a not serializable result: org.apache.lucene.facet.FacetsConfig
Serialization stack:
        - object not serializable (class: org.apache.lucene.facet.FacetsConfig, value: org.apache.lucene.facet.FacetsConfig@6ecbc8fb)
        - field (class: org.zouzias.spark.lucenerdd.partition.LuceneRDDPartition, name: FacetsConfig, type: class org.apache.lucene.facet.FacetsConfig)
        - object (class org.zouzias.spark.lucenerdd.partition.LuceneRDDPartition, org.zouzias.spark.lucenerdd.partition.LuceneRDDPartition@51f3cd43); not retrying
  1. Logs coming up in next comment.

from spark-lucenerdd.

stork-epitropov avatar stork-epitropov commented on May 20, 2024

Log from driver:

18/11/13 14:38:59 INFO DAGScheduler: Submitting 27 missing tasks from ShuffleMapStage 101 (MapPartitionsRDD[39] at persist at CompanyMatcher.scala:34) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14))
18/11/13 14:38:59 INFO TaskSchedulerImpl: Adding task set 101.0 with 27 tasks
18/11/13 14:39:00 WARN TransportChannelHandler: Exception in connection from /192.168.129.50:59774
java.io.IOException: Connection reset by peer
	at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
	at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
	at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
	at sun.nio.ch.IOUtil.read(IOUtil.java:192)
	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
	at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)
	at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1106)
	at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:343)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
	at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
	at java.lang.Thread.run(Thread.java:748)
18/11/13 14:39:00 ERROR TaskSchedulerImpl: Lost executor 0 on 192.168.129.50: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
18/11/13 14:39:00 WARN TaskSetManager: Lost task 0.0 in stage 168.0 (TID 5132, 192.168.129.50, executor 0): ExecutorLostFailure (executor 0 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
18/11/13 14:39:00 WARN TaskSetManager: Lost task 0.0 in stage 103.1 (TID 6055, 192.168.129.50, executor 0): ExecutorLostFailure (executor 0 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
18/11/13 14:39:00 WARN TaskSetManager: Lost task 3.0 in stage 103.1 (TID 6058, 192.168.129.50, executor 0): ExecutorLostFailure (executor 0 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
18/11/13 14:39:00 WARN TaskSetManager: Lost task 6.0 in stage 103.1 (TID 6061, 192.168.129.50, executor 0): ExecutorLostFailure (executor 0 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
18/11/13 14:39:00 WARN TaskSetManager: Lost task 5.0 in stage 103.1 (TID 6060, 192.168.129.50, executor 0): ExecutorLostFailure (executor 0 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
18/11/13 14:39:00 WARN TaskSetManager: Lost task 2.0 in stage 103.1 (TID 6057, 192.168.129.50, executor 0): ExecutorLostFailure (executor 0 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
18/11/13 14:39:00 WARN TaskSetManager: Lost task 1.0 in stage 103.1 (TID 6056, 192.168.129.50, executor 0): ExecutorLostFailure (executor 0 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
18/11/13 14:39:00 WARN TaskSetManager: Lost task 4.0 in stage 103.1 (TID 6059, 192.168.129.50, executor 0): ExecutorLostFailure (executor 0 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
18/11/13 14:39:00 INFO DAGScheduler: Executor lost: 0 (epoch 182)
18/11/13 14:39:00 INFO BlockManagerMasterEndpoint: Trying to remove executor 0 from BlockManagerMaster.
18/11/13 14:39:00 INFO BlockManagerMaster: Removed 0 successfully in removeExecutor

Log from stderr of executor:

18/11/13 14:36:06 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
18/11/13 14:36:06 INFO LuceneRDDPartition: [partId=0]Indexing process completed at 2018-11-13T14:36:06.255Z...
18/11/13 14:36:06 INFO LuceneRDDPartition: [partId=0]Indexing process took 346 seconds...
18/11/13 14:36:46 INFO LuceneRDDPartition: [partId=0]Indexed 6277810 documents
18/11/13 14:38:58 WARN BlockManager: Block rdd_618_0 could not be removed as it was not found on disk or in memory
18/11/13 14:38:58 ERROR Executor: Exception in task 0.0 in stage 170.0 (TID 5133)
java.lang.OutOfMemoryError: Java heap space
	at java.util.IdentityHashMap.resize(IdentityHashMap.java:472)
	at java.util.IdentityHashMap.put(IdentityHashMap.java:441)
	at org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:174)
	at org.apache.spark.util.SizeEstimator$.visitArray(SizeEstimator.scala:252)
	at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:209)
	at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:201)
	at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:69)
	at org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78)
	at org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70)
	at org.apache.spark.util.collection.SizeTrackingVector.$plus$eq(SizeTrackingVector.scala:31)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:217)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1092)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1083)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1018)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1083)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:809)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
18/11/13 14:38:58 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker for task 5133,5,main]
java.lang.OutOfMemoryError: Java heap space
	at java.util.IdentityHashMap.resize(IdentityHashMap.java:472)
	at java.util.IdentityHashMap.put(IdentityHashMap.java:441)
	at org.apache.spark.util.SizeEstimator$SearchState.enqueue(SizeEstimator.scala:174)
	at org.apache.spark.util.SizeEstimator$.visitArray(SizeEstimator.scala:252)
	at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:209)
	at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:201)
	at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:69)
	at org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78)
	at org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70)
	at org.apache.spark.util.collection.SizeTrackingVector.$plus$eq(SizeTrackingVector.scala:31)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:217)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1092)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1083)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1018)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1083)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:809)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
18/11/13 14:38:58 INFO DiskBlockManager: Shutdown hook called

from spark-lucenerdd.

stork-epitropov avatar stork-epitropov commented on May 20, 2024

I've managed to isolate only the Lucene part in order to exclude other possible issues. So during the .link part. with 6.2M records for LuceneRdd and 700 000 records for the other RDD. I receive GcOverhead with standard GC and 50gb for executor.

18/11/14 11:18:56 WARN TaskSetManager: Lost task 0.0 in stage 10.0 (TID 163, 192.168.129.50, executor 0): java.lang.OutOfMemoryError: GC overhead limit exceeded
        at org.apache.lucene.codecs.lucene50.Lucene50PostingsReader.newTermState(Lucene50PostingsReader.java:145)
        at org.apache.lucene.codecs.blocktree.SegmentTermsEnumFrame.<init>(SegmentTermsEnumFrame.java:98)
        at org.apache.lucene.codecs.blocktree.SegmentTermsEnum.getFrame(SegmentTermsEnum.java:215)
        at org.apache.lucene.codecs.blocktree.SegmentTermsEnum.pushFrame(SegmentTermsEnum.java:241)
        at org.apache.lucene.codecs.blocktree.SegmentTermsEnum.seekExact(SegmentTermsEnum.java:471)
        at org.apache.lucene.index.TermContext.build(TermContext.java:99)
        at org.apache.lucene.search.TermQuery.createWeight(TermQuery.java:211)
        at org.apache.lucene.search.IndexSearcher.createWeight(IndexSearcher.java:743)
        at org.apache.lucene.search.BooleanWeight.<init>(BooleanWeight.java:54)
        at org.apache.lucene.search.BooleanQuery.createWeight(BooleanQuery.java:207)
        at org.apache.lucene.search.IndexSearcher.createWeight(IndexSearcher.java:743)
        at org.apache.lucene.search.BooleanWeight.<init>(BooleanWeight.java:54)
        at org.apache.lucene.search.BooleanQuery.createWeight(BooleanQuery.java:207)
        at org.apache.lucene.search.IndexSearcher.createWeight(IndexSearcher.java:743)
        at org.apache.lucene.search.BooleanWeight.<init>(BooleanWeight.java:54)
        at org.apache.lucene.search.BooleanQuery.createWeight(BooleanQuery.java:207)
        at org.apache.lucene.search.IndexSearcher.createWeight(IndexSearcher.java:743)
        at org.apache.lucene.search.IndexSearcher.search(IndexSearcher.java:463)
        at org.apache.lucene.search.IndexSearcher.search(IndexSearcher.java:584)
        at org.apache.lucene.search.IndexSearcher.searchAfter(IndexSearcher.java:439)
        at org.apache.lucene.search.IndexSearcher.search(IndexSearcher.java:450)
        at org.zouzias.spark.lucenerdd.query.LuceneQueryHelpers$.searchParser(LuceneQueryHelpers.scala:107)
        at org.zouzias.spark.lucenerdd.partition.LuceneRDDPartition.query(LuceneRDDPartition.scala:135)
        at org.zouzias.spark.lucenerdd.LuceneRDD$$anonfun$5$$anonfun$apply$2$$anonfun$apply$3.apply(LuceneRDD.scala:236)
        at org.zouzias.spark.lucenerdd.LuceneRDD$$anonfun$5$$anonfun$apply$2$$anonfun$apply$3.apply(LuceneRDD.scala:235)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
        at org.zouzias.spark.lucenerdd.LuceneRDD$$anonfun$5$$anonfun$apply$2.apply(LuceneRDD.scala:235)

from spark-lucenerdd.

zouzias avatar zouzias commented on May 20, 2024

I've managed to isolate only the Lucene part in order to exclude other possible issues. So during the .link part. with 6.2M records for LuceneRdd and 700 000 records for the other RDD. I receive GcOverhead with standard GC and 50gb for executor.

18/11/14 11:18:56 WARN TaskSetManager: Lost task 0.0 in stage 10.0 (TID 163, 192.168.129.50, executor 0): java.lang.OutOfMemoryError: GC overhead limit exceeded
        at org.apache.lucene.codecs.lucene50.Lucene50PostingsReader.newTermState(Lucene50PostingsReader.java:145)
        at org.apache.lucene.codecs.blocktree.SegmentTermsEnumFrame.<init>(SegmentTermsEnumFrame.java:98)
        at org.apache.lucene.codecs.blocktree.SegmentTermsEnum.getFrame(SegmentTermsEnum.java:215)
        at org.apache.lucene.codecs.blocktree.SegmentTermsEnum.pushFrame(SegmentTermsEnum.java:241)
        at org.apache.lucene.codecs.blocktree.SegmentTermsEnum.seekExact(SegmentTermsEnum.java:471)
        at org.apache.lucene.index.TermContext.build(TermContext.java:99)
        at org.apache.lucene.search.TermQuery.createWeight(TermQuery.java:211)
        at org.apache.lucene.search.IndexSearcher.createWeight(IndexSearcher.java:743)
        at org.apache.lucene.search.BooleanWeight.<init>(BooleanWeight.java:54)
        at org.apache.lucene.search.BooleanQuery.createWeight(BooleanQuery.java:207)
        at org.apache.lucene.search.IndexSearcher.createWeight(IndexSearcher.java:743)
        at org.apache.lucene.search.BooleanWeight.<init>(BooleanWeight.java:54)
        at org.apache.lucene.search.BooleanQuery.createWeight(BooleanQuery.java:207)
        at org.apache.lucene.search.IndexSearcher.createWeight(IndexSearcher.java:743)
        at org.apache.lucene.search.BooleanWeight.<init>(BooleanWeight.java:54)
        at org.apache.lucene.search.BooleanQuery.createWeight(BooleanQuery.java:207)
        at org.apache.lucene.search.IndexSearcher.createWeight(IndexSearcher.java:743)
        at org.apache.lucene.search.IndexSearcher.search(IndexSearcher.java:463)
        at org.apache.lucene.search.IndexSearcher.search(IndexSearcher.java:584)
        at org.apache.lucene.search.IndexSearcher.searchAfter(IndexSearcher.java:439)
        at org.apache.lucene.search.IndexSearcher.search(IndexSearcher.java:450)
        at org.zouzias.spark.lucenerdd.query.LuceneQueryHelpers$.searchParser(LuceneQueryHelpers.scala:107)
        at org.zouzias.spark.lucenerdd.partition.LuceneRDDPartition.query(LuceneRDDPartition.scala:135)
        at org.zouzias.spark.lucenerdd.LuceneRDD$$anonfun$5$$anonfun$apply$2$$anonfun$apply$3.apply(LuceneRDD.scala:236)
        at org.zouzias.spark.lucenerdd.LuceneRDD$$anonfun$5$$anonfun$apply$2$$anonfun$apply$3.apply(LuceneRDD.scala:235)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
        at org.zouzias.spark.lucenerdd.LuceneRDD$$anonfun$5$$anonfun$apply$2.apply(LuceneRDD.scala:235)

Is this during index creation (after caching and running .link(...)?

From a high level point of view, there are two points where you might run out of memory:

  1. During index creation, but this should not happen after storing the index on disk (store.mode=disk)
  2. During the execution of the .link, since you will broadcast all queries RDD. To avoid this, you need to make sure that the driver has enough memory to collect the 700K queries.

Can you extract from the logs which case happens here?

from spark-lucenerdd.

stork-epitropov avatar stork-epitropov commented on May 20, 2024

Its in Lucene.RDD on line 233 I guess it's during the linking itself (execution of the queries).

With 30gb executor memory I get either GC Overhead or OOM. With 50gb for executor I get (I've set timeout to 1000 seconds):

18/11/14 12:17:04 WARN HeartbeatReceiver: Removing executor 0 with no recent heartbeats: 1007154 ms exceeds timeout 1000000 ms                                                                                     
18/11/14 12:17:04 ERROR TaskSchedulerImpl: Lost executor 0 on 192.168.129.50: Executor heartbeat timed out after 1007154 ms

I am running the same now on bigger machine with 200gb memory and will see what the result would be.

from spark-lucenerdd.

zouzias avatar zouzias commented on May 20, 2024

You need to increase your driver's memory, if you run OOM in line

https://github.com/zouzias/spark-lucenerdd/blob/master/src/main/scala/org/zouzias/spark/lucenerdd/LuceneRDD.scala#L233

Put 64gb of memory on the driver and executor as well, since you have a big fat machine.

Tip: You can try to see if you can link the first 100K queries, and if so, run your job in 7 batches with a lower memory footprint.

from spark-lucenerdd.

zouzias avatar zouzias commented on May 20, 2024

I pushed a diff that allows you to select a subset of column names that will not analyzed:

f255acb

You can use this diff by setting the version to 0.3.4-SNAPSHOT and adding the following line in your SBT

resolvers += "OSS Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots"

from spark-lucenerdd.

stork-epitropov avatar stork-epitropov commented on May 20, 2024

Thanks a lot Zouzias! It will be really useful in my case.
I have another question to you.
On the big machine with 200gb I managed to succeed with 180 for executor and 20 for driver. But I cannot allow to use such a big machine it was just for a test. I would try to squeeze in the 64gb machine.
Linking an Rdd with 80 000 records works just fine.
Then for the 700 000 RDD I tried to split it with 50 000 max record size but I get the following result:
image

I am doing the following:

val luceneRDD: LuceneRDD[Row] = LuceneRDD(
      sourceToIndex,
      TermAnalyzerName,
      QueryAnalyzerName,
      SimilarityAlgorithm
    )

luceneRDD.cache

    def tokenizeString: UserDefinedFunction =
      udf((str: String) => {

        if (str != null) {
          val analyzer = new StandardAnalyzer(CharArraySet.EMPTY_SET)

          val result: ListBuffer[String] = ListBuffer()
          try {
            val tokenStream = analyzer.tokenStream(null, new StringReader(str))
            tokenStream.reset()
            while (tokenStream.incrementToken()) {
              result += tokenStream
                .getAttribute(classOf[CharTermAttribute])
                .toString
            }
          } catch {
            case e: IOException => throw new RuntimeException()
          }
          if (result.isEmpty) {
            null
          } else {
            result.map(_.replace(":", "\\:"))
          }
        } else {
          null
        }
      })


    val sourceToMatch = sourceDF
      .withColumn(TokenizedNameFieldName, tokenizeString(col(NamesFieldName)))
      .select(TokenizedNameFieldName)
      .rdd
      .map(_.getAs[Seq[String]](TokenizedNameFieldName))

    def genereteLuceneScore(mode: String) = {

      val sourceWithTokenizedNameField = sourceToMatch

      val maxResultSize = args(1).toInt
      var numSlices = (sourceToMatch.count() / 50000).ceil.toInt
      numSlices = if (numSlices == 0) { 1 } else numSlices
      val split = Array.fill(numSlices)(1d)
      var result =
        sparkSession.sparkContext.emptyRDD[(Seq[String], Array[SparkScoreDoc])]
      for (source <- sourceWithTokenizedNameField.randomSplit(split)) {
        if (mode == "broad") {
          val a = generateLuceneBroadScores(source)
          a.count
          result = result.union(a)
        } else if (mode == "precise") {
          val a = generateLucenePreciseScores(source)
          a.count
          result = result.union(a)
        }
      }
      result
    }

    def generateLuceneBroadScores(
        source: RDD[Seq[String]]): RDD[(Seq[String], Array[SparkScoreDoc])] = {

      val sourceWithTokenizedNameField = sourceToMatch

      def generateBroadQuery(row: Seq[String]): String = {

        val queries = new java.util.ArrayList[Query]()

        val tokens: Seq[String] = row
        require(tokens.nonEmpty, "Tokens should be of size greater then 0")

        for (field <- Seq(NamesFieldName, DescriptionsFieldName)) {

          var booleanQueryBuilder = new BooleanQuery.Builder()

          for (token <- tokens) {
            booleanQueryBuilder.add(new TermQuery(new Term(field, token)),
                                    BooleanClause.Occur.SHOULD)
          }

          val booleanQuery: BooleanQuery = booleanQueryBuilder.build()
          queries.add(booleanQuery)
        }
        val result =
          new DisjunctionMaxQuery(queries, 1).toString().replace("~", "")
        result
      }

      luceneRDD.link(sourceWithTokenizedNameField, generateBroadQuery, 20)
    }

    def generateLucenePreciseScores(
        source: RDD[Seq[String]]): RDD[(Seq[String], Array[SparkScoreDoc])] = {

      val sourceWithTokenizedNameField = sourceToMatch

      def generatePreciseQuery(row: Seq[String]): String = {

        val tokens: Seq[String] = row
        require(tokens.nonEmpty, "Tokens should be of size greater then 0")

        var booleanQueryBuilder = new BooleanQuery.Builder()

        for (token <- tokens) {
          booleanQueryBuilder.add(
            new TermQuery(new Term(NamesFieldName, token)),
            BooleanClause.Occur.SHOULD)
        }

        val result: String = (booleanQueryBuilder.build()).toString
        result
      }

      luceneRDD.link(sourceWithTokenizedNameField, generatePreciseQuery, 20)
    }

I am calling count on each iteration to force the generation of the DataFrame otherwise I guess the optimizer will combine the whole operation into one and I will again reach OOM. The funny thing is that even on the first iteration I get GC overhed and I get really a lot of GC time. While I am matching a single dataset with 80 000 records it goes okay. Do you have any idea why it behaves like that?

from spark-lucenerdd.

stork-epitropov avatar stork-epitropov commented on May 20, 2024

One more input that I find rather strange.
So with index 6 300 000 records and other RDD with 80 000 records to link and 1 partition for LuceneRDD everything goes okay.
With Index 6 300 000 records and other RDD with 700 000 records and various number of partitions for LuceneRDD I always end up with GC Overhead limit reached on line 233.

  1. It's not the driver memory I've verified that with enough executor memory that 20gb for the driver are sufficient
  2. Shouldn't using many partitions split the work into chunks and not consume so much memory at once?

Do you have any idea why neither having many partitions or splitting the input into equally sized chunks is working?

Thanks a lot,
Dimitar

from spark-lucenerdd.

zouzias avatar zouzias commented on May 20, 2024

Hi there,

Do you run your spark job on a cluster or local mode?

If you run it on local mode, then you whole spark job runs under a single JVM that consumes in total the amount of driver's plus executors memory.

Regarding (2), in standalone mode I am not sure how the memory management in the executors happen. As far as I remember, the executors run within the same JVM (as the driver) so the executors compete for memory.

See also: https://stackoverflow.com/questions/43971022/how-to-tune-memory-for-spark-application-running-in-local-mode

from spark-lucenerdd.

stork-epitropov avatar stork-epitropov commented on May 20, 2024

Hi Zouzias I am running on Standalone cluster with 1 executor.

Yes they have to split driver and executor memory. I have verified that the driver does not need any more than 20gb for the process to complete successfully so 40gb are left for executor.

There is no more way to tune it I've tried everything.
By the way I've tried native Lucene.

  1. I collect all data of the RDD that we index to the driver and create an index in a local dir.
  2. I collect a single column that is used to make the queries to the driver.
  3. I construct all the queries.
  4. I am running them on an index that has been built in a local folder.
  5. I build a new RDD from the results.

This way I managed to run the linking on the same amount the of data.
This made me wonder why LuceneRDD has such a big overhead might be due to Spark? Anyway there are couple of questions that I could not answer myself that blocked me from executing my application with LuceneRDD. I don't seek an answer as I will use native Lucene, but they might help you think of ways to optimize the library as it's really useful library this one.

  1. Even with many partitions I get GC ovrehead when matching big DataSet to big Index
  2. Even when splitting the big DatSet to several chunks I get GC ovehead on the linking process.
  3. The LuceneRDD is 17GB while the index in native Lucene is 1.6GB this is 10 times larger.

Best regards,
Dimitar

from spark-lucenerdd.

zouzias avatar zouzias commented on May 20, 2024

Hi Dimitar,

About item 3, I did a small fix some time ago following your comments. The reason that LuceneRDD consumed 17GB of memory was because on .cache() I was also cache all the elements of the RDD in memory (now it is stored only on disk).

See 4e833e8

Thank you for reporting the issues!

from spark-lucenerdd.

yeikel avatar yeikel commented on May 20, 2024

Hi Dimitar,

About item 3, I did a small fix some time ago following your comments. The reason that LuceneRDD consumed 17GB of memory was because on .cache() I was also cache all the elements of the RDD in memory (now it is stored only on disk).

See 4e833e8

Thank you for reporting the issues!

Any chance you can also do a release with this change that is compatible with Spark 2.1?

from spark-lucenerdd.

yeikel avatar yeikel commented on May 20, 2024

Currently, the library analyzed all fields, but I can make a diff to analyze only a selected set of fields. Let me know if this is the case.

Did you get to implement this?

from spark-lucenerdd.

zouzias avatar zouzias commented on May 20, 2024

from spark-lucenerdd.

yeikel avatar yeikel commented on May 20, 2024

I am not sure why but I am having this issue again

20/02/12 14:31:30 WARN TaskSetManager: Lost task 10549.0 in stage 6.0 (TID 11936, executor 186): ExecutorLostFailure (executor 186 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 27.7 GB of 27.5 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

This is the configuration that I have :

lucenerdd {

  linker.method = "cartesian"
  index {
   store.mode = "disk"
}

}

20/02/12 17:59:07 INFO LuceneRDDPartition: Config parameter lucenerdd.index.store.mode is set to 'disk'
20/02/12 17:59:07 INFO LuceneRDDPartition: Lucene index will be storage in disk

I am not sure what could be causing OOM when I am working using the disk

from spark-lucenerdd.

yeikel avatar yeikel commented on May 20, 2024

The line that seems to be causing this issue is flatMap at [LuceneRDD.scala:222]

It fails as a domino effect until the job fails.

image

My cluster is relatively big , so not sure why this is happening

image

I am trying to self join a dataframe with 95 million records. Worked before for some reason , so not sure why it is failing now

from spark-lucenerdd.

yeikel avatar yeikel commented on May 20, 2024

Do you have any idea @zouzias

from spark-lucenerdd.

yeikel avatar yeikel commented on May 20, 2024

image

image

image

from spark-lucenerdd.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.