Giter Club home page Giter Club logo

spark-lucenerdd's Introduction

spark-lucenerdd

Master codecov Maven Javadocs Gitter

Spark RDD with Apache Lucene's query capabilities.

The main abstractions are special types of RDD called LuceneRDD, FacetedLuceneRDD and ShapeLuceneRDD, which instantiate a Lucene index on each Spark executor. These RDDs distribute search queries and aggregate search results between the Spark driver and its executors. Currently, the following queries are supported:

Operation Syntax Description
Term Query LuceneRDD.termQuery(field, query, topK) Exact term search
Fuzzy Query LuceneRDD.fuzzyQuery(field, query, maxEdits, topK) Fuzzy term search
Phrase Query LuceneRDD.phraseQuery(field, query, topK) Phrase search
Prefix Query LuceneRDD.prefixSearch(field, prefix, topK) Prefix search
Query Parser LuceneRDD.query(queryString, topK) Query parser search
Faceted Search FacetedLuceneRDD.facetQuery(queryString, field, topK) Faceted Search
Record Linkage LuceneRDD.link(otherEntity: RDD[T], linkageFct: T => searchQuery, topK) Record linkage via Lucene queries
Circle Search ShapeLuceneRDD.circleSearch((x,y), radius, topK) Search within radius
Bbox Search ShapeLuceneRDD.bboxSearch(lowerLeft, upperLeft, topK) Bounding box
Spatial Linkage ShapeLuceneRDD.linkByRadius(RDD[T], linkage: T => (x,y), radius, topK) Spatial radius linkage

Using the query parser, you can perform prefix queries, fuzzy queries, prefix queries, etc and any combination of those. For more information on using Lucene's query parser, see Query Parser.

Here are a few examples using LuceneRDD for full text search, spatial search and record linkage. All examples exploit Lucene's flexible query language. For spatial search, lucene-spatial and jts are required.

For more, check the wiki. More examples are available at examples and performance evaluation examples on AWS can be found here.

Presentations

For an overview of the library, check these ScalaIO 2016 Slides.

Linking

You can link against this library (for Spark 1.4+) in your program at the following coordinates:

Using SBT:

libraryDependencies += "org.zouzias" %% "spark-lucenerdd" % "0.4.0"

Using Maven:

<dependency>
    <groupId>org.zouzias</groupId>
    <artifactId>spark-lucenerdd_2.12</artifactId>
    <version>0.4.0</version>
</dependency>

This library can also be added to Spark jobs launched through spark-shell or spark-submit by using the --packages command line option. For example, to include it when starting the spark shell:

$ bin/spark-shell --packages org.zouzias:spark-lucenerdd_2.12:0.4.0

Unlike using --jars, using --packages ensures that this library and its dependencies will be added to the classpath. The --packages argument can also be used with bin/spark-submit.

Compatibility

The project has the following compatibility with Apache Spark:

Artifact Release Date Spark compatibility Notes Status
0.4.1-SNAPSHOT >= 3.5.0, JVM 11 develop Under Development
0.4.0 2023-10-06 = 3.5.0, JVM 11 develop Released
0.3.10 2021-06-02 >= 2.4.8, JVM 8 tag v0.3.10 Released
0.3.9 2020-11-30 >= 2.4.7, JVM 8 tag v.0.3.9 Released
0.2.8 2017-05-30 2.1.x, JVM 7 tag v0.2.8 Released
0.1.0 2016-09-26 1.4.x, 1.5.x, 1.6.x tag v0.1.0 Cross-released with 2.10/2.11

Project Status and Limitations

Implicit conversions for the primitive types (Int, Float, Double, Long, String) are supported. Moreover, implicit conversions for all product types (i.e., tuples and case classes) of the above primitives are supported. Implicits for tuples default the field names to "_1", "_2", "_3, ... following Scala's naming conventions for tuples. In addition, implicits for most Spark DataFrame types are supported (MapType and boolean are missing).

Custom Case Classes

If you want to use your own custom class with LuceneRDD you can do it provided that your class member types are one of the primitive types (Int, Float, Double, Long, String).

For more details, see LuceneRDDCustomcaseClassImplicits under the tests directory.

Development

Docker

A docker compose script is setup with some preliminary notebook in Zeppelin, run

docker-compose up

For more LuceneRDD examples on Zeppelin, check these examples

Build from Source

Install Java, SBT and clone the project

git clone https://github.com/zouzias/spark-lucenerdd.git
cd spark-lucenerdd
sbt compile assembly

The above will create an assembly jar containing spark-lucenerdd functionality under target/scala-*/spark-lucenerdd-assembly-*.jar

spark-lucenerdd's People

Contributors

rafaharo avatar scala-steward avatar zouzias avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

spark-lucenerdd's Issues

How much index size is supported?

Hi all,

I would like to use that, and want to know how much index size is supported by spark-lucenerdd? Could it support million size documents ? I saw the default config of index store is using memory, while our index size seems cannot load to memory once.

Thanks
Aimee

pattern match for suffix

can we try same code with pattern match for suffix or something like regular exp. Any one has tried different option pls let me know.

Remove dependency on sbt-spark-package

Is your feature request related to a problem? Please describe.
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]

Describe the solution you'd like
A clear and concise description of what you want to happen.

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

Additional context

Support for Linker with Query type

Describe the solution you'd like
I'd like to use Lucence query builder to generate my queries instead of the query String. The toString method does not guarantee compatibility and it fails for some cases.

For example :

val addressTerms = Seq("harvard","denver")
 val booleanQueryBuilder = new BooleanQuery.Builder()

addressTerms.foreach(t=>{
          booleanQueryBuilder.add(new FuzzyQuery(new Term("address", t)), BooleanClause.Occur.SHOULD)
        })

val booleanQuery: BooleanQuery = booleanQueryBuilder.setMinimumNumberShouldMatch(addressTerms.size/2).build()
val query = booleanQuery.toString()

Produces the following exception :

(address:harvard~2 address:denver~2 )~3': Encountered " <FUZZY_SLOP> "~3 "" at line 1, column 95.

Doing a search and replace for the token "~" is not really an option because the query won't behave as expected

Update to SBT 1.x

Is your feature request related to a problem? Please describe.
Version update

Describe the solution you'd like
Update sbt to version 1.x (latest)

Describe alternatives you've considered
None

Additional context
First, #174 must be resolved.

Error when attempting to run example

Hi,

I'm running the record linkage example https://github.com/zouzias/spark-lucenerdd/wiki/Record-Linkage for countries.txt in the cloudera quickstart vm http://www.cloudera.com/downloads/quickstart_vms/5-8.html and getting some errors.

starting the spark shell with the following for spark 1.6 compatibility:
spark-shell --packages org.zouzias:spark-lucenerdd_2.10:0.1.0

first issue is I'm getting the following error as spark starts

evicted modules:
	org.scala-lang#scala-reflect;2.10.4 by [org.scala-lang#scala-reflect;2.10.5] in [default]

I continue with the example but when I run val luceneRDD = LuceneRDD(countries) , I get the following error:
error: No implicit view available from String => org.apache.lucene.document.Document

Please can you advise?

Thanks
Dominic

Support Scala 2.12

Is your feature request related to a problem? Please describe.

Spark supports 2.12 without experimental tag since 2.4.1. It would be nice to support it as well here.

Describe the solution you'd like
A clear and concise description of what you want to happen.

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

Additional context
Blockers are:

  • spark-testing-base does not support 2.12 yet.

Configurable Similarity Scoring

As a user, I want to be able to change Lucene's scoring similarity.

Minimal functionality: Provide configuration that allows the user to change the similarity from TF-IDF to BM25 (say)

Usable from Java or PySpark

This looks really useful. I'd like to be able to use this from PySpark, or failing that, from Java.
Do you have any pointers?

GC overhead limit exceeded using Disk Mode

I am using store.mode = "disk" but I am still observing the GC overhead limit exceeded exception

The size of my cluster is 116 GB of RAM with 10 executors with 3 cores each , and I am trying to index 180M documents.

java.lang.OutOfMemoryError: GC overhead limit exceeded
	at org.apache.lucene.util.ByteBlockPool$DirectTrackingAllocator.getByteBlock(ByteBlockPool.java:103)
	at org.apache.lucene.util.ByteBlockPool.nextBuffer(ByteBlockPool.java:203)
	at org.apache.lucene.index.TermsHashPerField.add(TermsHashPerField.java:118)
	at org.apache.lucene.index.TermsHashPerField.add(TermsHashPerField.java:189)
	at org.apache.lucene.index.DefaultIndexingChain$PerField.invert(DefaultIndexingChain.java:843)
	at org.apache.lucene.index.DefaultIndexingChain.processField(DefaultIndexingChain.java:430)
	at org.apache.lucene.index.DefaultIndexingChain.processDocument(DefaultIndexingChain.java:394)
	at org.apache.lucene.index.DocumentsWriterPerThread.updateDocument(DocumentsWriterPerThread.java:251)
	at org.apache.lucene.index.DocumentsWriter.updateDocument(DocumentsWriter.java:494)
	at org.apache.lucene.index.IndexWriter.updateDocument(IndexWriter.java:1616)
	at org.apache.lucene.index.IndexWriter.addDocument(IndexWriter.java:1235)
	at org.zouzias.spark.lucenerdd.partition.LuceneRDDPartition$$anonfun$3.apply(LuceneRDDPartition.scala:72)
	at org.zouzias.spark.lucenerdd.partition.LuceneRDDPartition$$anonfun$3.apply(LuceneRDDPartition.scala:69)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
	at org.zouzias.spark.lucenerdd.partition.LuceneRDDPartition.<init>(LuceneRDDPartition.scala:69)
	at org.zouzias.spark.lucenerdd.partition.LuceneRDDPartition$.apply(LuceneRDDPartition.scala:260)
	at org.zouzias.spark.lucenerdd.LuceneRDD$$anonfun$13$$anonfun$apply$7.apply(LuceneRDD.scala:520)
	at org.zouzias.spark.lucenerdd.LuceneRDD$$anonfun$13$$anonfun$apply$7.apply(LuceneRDD.scala:517)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
19/03/13 14:35:53 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker for task 1506,5,main]

How can I set a custom analyzer?

Given that the current method signature is described as follow :

indexAnalyzer: String = getOrElseEn(IndexAnalyzerConfigName),
queryAnalyzer: String = getOrElseEn(QueryAnalyzerConfigName), 

It would be great to be able to provide the instance of the analyzer instead of a String. This would open the door to create custom analyzers and use them with LuceneRDD.

I tried to create a package org.apache.lucene.analysis and a class "StandardCustomStopWords" which extends Analyzer and passed indexAnalyzer = "StandardCustomStopWords" but that does not seem to work. It also did not print any type of warning , so we might need to consider adding this also

I am guessing that compiling a custom lucene build is a solution , but it is not very convenient or practical.

How to create LuceneRDD in Java?

It seems really helpful to me. But when I try to use it in Java, like initiate with new LuceneRDD(df.RDD()) in java, it seems need some other arguments, such as partitionsRDD, indexAnalyzer, queryAnalyzer, similarity. How to set these argument?

Thanks.

Is it possible to persist , update and re-use a lucenerdd from disk?

This is an issue similar to #136 , but I decided to create a new issue to avoid pinging an old and closed issue.

I have a relatively big table (~180 million records) for which I do daily linking against a smaller sets (200k).

From what I understand , every time I launch the process , I need to produce tokens for both tables which is very time consuming for the big table.

Is it possible to do a one time index process over the big table and store it on disk for later use? Also , is it possible to append/update it? (the big table will receive updates daily)

In general , ElasticSearch provides this functionality but it requires a HTTP call and a second infra which I'd like to avoid.

Indexing is successful even if the types are not supported/unexpected

Describe the bug
If indexing process is not successful( Example : Unsupported types) , logs show that the document was indexed successfully.

To Reproduce

    val entities = Seq(
      ("123",Seq("Peter", "Johan"),"123"),
      ("125",Seq("Peter"),"123"),
      ("1234",Seq("Marc", "Johan"),"210")
    ).toDF("id","names","address").coalesce(1)

    val entitiesRDD = LuceneRDD(entities)

   entitiesRDD.count

names is not supported because it is a multivalued entry and the return document from the indexing is an empty document.

image

19/03/29 03:26:08 INFO LuceneRDDPartition: Index disk location C:\Users\user\AppData\Local\Temp\
19/03/29 03:26:08 INFO LuceneRDDPartition: Config parameter lucenerdd.index.store.mode is set to 'disk'
19/03/29 03:26:08 INFO LuceneRDDPartition: Lucene index will be storage in disk
19/03/29 03:26:08 INFO LuceneRDDPartition: Index disk location C:\Users\user\AppData\Local\Temp\
19/03/29 03:26:08 INFO LuceneRDDPartition: [partId=0] Partition is created...
19/03/29 03:26:08 INFO LuceneRDDPartition: [partId=0]Indexing process initiated at 2019-03-29T03:26:08.170-04:00...
19/03/29 03:26:08 INFO LuceneRDDPartition: [partId=0]Indexing process completed at 2019-03-29T03:26:08.755-04:00...
19/03/29 03:26:08 INFO LuceneRDDPartition: [partId=0]Indexing process took 0 seconds...
19/03/29 03:26:08 INFO LuceneRDDPartition: [partId=0]Indexed 1 documents

Expected behavior

I believe indexing should fail or at least give a warning

This may be related to #115

Runtime error running the samples : java.lang.NoSuchMethodError: org.apache.lucene.index.IndexWriter.addDocument(Ljava/lang/Iterable;)V

Describe the bug
I am running the LinkageAbtvsBuy example as it is , but I am receiving the following exception at runtime:

2019-01-23 11:18:01 ERROR Executor:91 - Exception in task 1.0 in stage 6.0 (TID 9)
java.lang.NoSuchMethodError: org.apache.lucene.index.IndexWriter.addDocument(Ljava/lang/Iterable;)V
	at org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyWriter.addCategoryDocument(DirectoryTaxonomyWriter.java:496)
	at org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyWriter.internalAddCategory(DirectoryTaxonomyWriter.java:457)
	at org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyWriter.addCategory(DirectoryTaxonomyWriter.java:424)
	at org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyWriter.<init>(DirectoryTaxonomyWriter.java:208)
	at org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyWriter.<init>(DirectoryTaxonomyWriter.java:287)
	at org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyWriter.<init>(DirectoryTaxonomyWriter.java:304)
	at org.zouzias.spark.lucenerdd.store.IndexWithTaxonomyWriter$class.taxoWriter(IndexWithTaxonomyWriter.scala:37)
	at org.zouzias.spark.lucenerdd.partition.LuceneRDDPartition.taxoWriter$lzycompute(LuceneRDDPartition.scala:47)
	at org.zouzias.spark.lucenerdd.partition.LuceneRDDPartition.taxoWriter(LuceneRDDPartition.scala:47)
	at org.zouzias.spark.lucenerdd.partition.LuceneRDDPartition$$anonfun$3.apply(LuceneRDDPartition.scala:70)
	at org.zouzias.spark.lucenerdd.partition.LuceneRDDPartition$$anonfun$3.apply(LuceneRDDPartition.scala:67)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
	at org.zouzias.spark.lucenerdd.partition.LuceneRDDPartition.<init>(LuceneRDDPartition.scala:67)
	at org.zouzias.spark.lucenerdd.partition.LuceneRDDPartition$.apply(LuceneRDDPartition.scala:243)
	at org.zouzias.spark.lucenerdd.LuceneRDD$$anonfun$6.apply(LuceneRDD.scala:347)
	at org.zouzias.spark.lucenerdd.LuceneRDD$$anonfun$6.apply(LuceneRDD.scala:347)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:843)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:843)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
2019-01-23 11:18:01 ERROR Executor:91 - Exception in task 0.0 in stage 6.0 (TID 8)
java.lang.NoSuchMethodError: org.apache.lucene.index.IndexWriter.addDocument(Ljava/lang/Iterable;)V
	at org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyWriter.addCategoryDocument(DirectoryTaxonomyWriter.java:496)
	at org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyWriter.internalAddCategory(DirectoryTaxonomyWriter.java:457)
	at org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyWriter.addCategory(DirectoryTaxonomyWriter.java:424)
	at org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyWriter.<init>(DirectoryTaxonomyWriter.java:208)
	at org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyWriter.<init>(DirectoryTaxonomyWriter.java:287)
	at org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyWriter.<init>(DirectoryTaxonomyWriter.java:304)
	at org.zouzias.spark.lucenerdd.store.IndexWithTaxonomyWriter$class.taxoWriter(IndexWithTaxonomyWriter.scala:37)
	at org.zouzias.spark.lucenerdd.partition.LuceneRDDPartition.taxoWriter$lzycompute(LuceneRDDPartition.scala:47)
	at org.zouzias.spark.lucenerdd.partition.LuceneRDDPartition.taxoWriter(LuceneRDDPartition.scala:47)
	at org.zouzias.spark.lucenerdd.partition.LuceneRDDPartition$$anonfun$3.apply(LuceneRDDPartition.scala:70)
	at org.zouzias.spark.lucenerdd.partition.LuceneRDDPartition$$anonfun$3.apply(LuceneRDDPartition.scala:67)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
	at org.zouzias.spark.lucenerdd.partition.LuceneRDDPartition.<init>(LuceneRDDPartition.scala:67)
	at org.zouzias.spark.lucenerdd.partition.LuceneRDDPartition$.apply(LuceneRDDPartition.scala:243)
	at org.zouzias.spark.lucenerdd.LuceneRDD$$anonfun$6.apply(LuceneRDD.scala:347)
	at org.zouzias.spark.lucenerdd.LuceneRDD$$anonfun$6.apply(LuceneRDD.scala:347)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:843)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:843)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

To Reproduce
Run LinkageAbtvsBuy with spark-lucenerdd version: 0.2.8 and Spark 2.1.0

Expected behavior
It should not produce such exception

Versions (please complete the following information):

  • spark-lucenerdd version: 0.2.8
  • Spark Version: 2.1.0
  • Java version: 8
  • Scala : 2.11.8

AnalysisException: resolved attribute(s) missing for blockEntityLinkage

Describe the bug

I'd like to link two dataframes blocking on a field named cd using blockEntityLinkage with the following Schemas :

df1
 |-- name: string (nullable = true)
 |-- address: string (nullable = true)
 |-- cd: string (nullable = true)
df2
 |-- name: string (nullable = true)
 |-- address: string (nullable = true)
 |--cd: string (nullable = true)

 val linkedResults = LuceneRDD.blockEntityLinkage(df1 , df2 ,
    linker,
    Array("cd"),
    Array("cd"),
    500
  )

But it produces the following exception :

Exception in thread "main" org.apache.spark.sql.AnalysisException: resolved attribute(s) cd#351 missing from name#444,address#454,cd#101 in operator !Project [name#444, address#454, cd#101, concat(cd#351) AS __PARTITION_COLUMN__#481];;
!Project [name#444, address#454, cd#101, concat(cd#351) AS __PARTITION_COLUMN__#481]
+- Project [name#444, address#454, cd#101]

A self join in df1 works just fine

I tried renaming the column but it did not work.

I am running Spark 2.1.0 and lucenerdd 0.3.3

Edit : An explanation about the issue can be found here but I don't believe it can be fixed on my end.

Thank you

Question about indexing process

While checking my logs, I saw the following events :

2019-03-12 14:24:09,730 INFO  [Executor task launch worker-1] partition.LuceneRDDPartition: [partId=514]Indexing process completed at 2019-03-12T14:24:09.730-07:00...
2019-03-12 14:24:09,730 INFO  [Executor task launch worker-1] partition.LuceneRDDPartition: [partId=514]Indexing process took 28 seconds...
2019-03-12 14:24:10,046 INFO  [Executor task launch worker-1] partition.LuceneRDDPartition: [partId=514]Indexed 852063 documents
2019-03-12 14:24:13,465 INFO  [Executor task launch worker-0] partition.LuceneRDDPartition: [partId=366]Indexing process completed at 2019-03-12T14:24:13.465-07:00...
2019-03-12 14:24:13,465 INFO  [Executor task launch worker-0] partition.LuceneRDDPartition: [partId=366]Indexing process took 32 seconds...
2019-03-12 14:24:13,897 INFO  [Executor task launch worker-0] partition.LuceneRDDPartition: [partId=366]Indexed 986009 documents
2019-03-12 14:23:36,578 INFO  [Executor task launch worker-1] partition.LuceneRDDPartition: [partId=79]Indexing process initiated at 2019-03-12T14:23:36.510-07:00...
2019-03-12 14:23:36,584 INFO  [Executor task launch worker-1] partition.LuceneRDDPartition: [partId=79]Indexing process completed at 2019-03-12T14:23:36.582-07:00...
2019-03-12 14:23:36,585 INFO  [Executor task launch worker-1] partition.LuceneRDDPartition: [partId=79]**Indexing process took 0 seconds...**

What do you think could be the reason that indexing took 0 seconds? / 0 documents while other partitions seem to have up to 852063 documents. Could that be an example of skewed partitions?

Error when creating LuceneRDD in spark lucene rdd

hi
I am trying to use Record Linkage in LuceneRDD in spark1.6.1 using https://github.com/zouzias/spark-lucenerdd/tree/branch_spark_1.x
I followed all the steps mentioned in the article.
When I do val luceneRDD = LuceneRDD(countries) I am getting the below error.
error: No implicit view available from String => org.apache.lucene.document.Document
any help is highly appreciated
LuceneRDD(countries)
^
Any help me to solve this issue is appreciated
Thanks in advance

Exception "requirement failed: TopK requires at least K>0" with collect when no results are found

Describe the bug

If we try to collect results after no results are found , the following exception is thrown :

Exception "requirement failed: TopK requires at least K>0"

To Reproduce


val entities = Seq("123").toDF("numbers")
entities.show(false)
val entitiesRDD = LuceneRDD(entities)
val results = entitiesRDD.termQuery("numbers","1234")
println(results.count) // 0 
results.collect()
requirement failed: TopK requires at least K>0
java.lang.IllegalArgumentException: requirement failed: TopK requires at least K>0
	at com.twitter.algebird.TopKMonoid.<init>(TopKMonoid.scala:52)
	at org.zouzias.spark.lucenerdd.response.LuceneRDDResponse.collect(LuceneRDDResponse.scala:79)
	at org.zouzias.spark.lucenerdd.LuceneRDDDataFrameMultivalues$$anonfun$1.apply(LuceneRDDDataFrameMultivalues.scala:53)
	at org.zouzias.spark.lucenerdd.LuceneRDDDataFrameMultivalues$$anonfun$1.apply(LuceneRDDDataFrameMultivalues.scala:33)
	at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
	at org.scalatest.Transformer.apply(Transformer.scala:22)
	at org.scalatest.Transformer.apply(Transformer.scala:20)
	at org.scalatest.FlatSpecLike$$anon$1.apply(FlatSpecLike.scala:1682)
	at org.scalatest.TestSuite$class.withFixture(TestSuite.scala:196)
	at org.scalatest.FlatSpec.withFixture(FlatSpec.scala:1685)
	at org.scalatest.FlatSpecLike$class.invokeWithFixture$1(FlatSpecLike.scala:1679)
	at org.scalatest.FlatSpecLike$$anonfun$runTest$1.apply(FlatSpecLike.scala:1692)
	at org.scalatest.FlatSpecLike$$anonfun$runTest$1.apply(FlatSpecLike.scala:1692)
	at org.scalatest.SuperEngine.runTestImpl(Engine.scala:286)
	at org.scalatest.FlatSpecLike$class.runTest(FlatSpecLike.scala:1692)
	at org.zouzias.spark.lucenerdd.LuceneRDDDataFrameMultivalues.org$scalatest$BeforeAndAfterEach$$super$runTest(LuceneRDDDataFrameMultivalues.scala:25)
	at org.scalatest.BeforeAndAfterEach$class.runTest(BeforeAndAfterEach.scala:221)
	at org.zouzias.spark.lucenerdd.LuceneRDDDataFrameMultivalues.runTest(LuceneRDDDataFrameMultivalues.scala:25)
	at org.scalatest.FlatSpecLike$$anonfun$runTests$1.apply(FlatSpecLike.scala:1750)
	at org.scalatest.FlatSpecLike$$anonfun$runTests$1.apply(FlatSpecLike.scala:1750)
	at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:393)
	at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:381)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:381)
	at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:370)
	at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:407)
	at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:381)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:381)
	at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:376)
	at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:458)
	at org.scalatest.FlatSpecLike$class.runTests(FlatSpecLike.scala:1750)
	at org.scalatest.FlatSpec.runTests(FlatSpec.scala:1685)
	at org.scalatest.Suite$class.run(Suite.scala:1124)
	at org.scalatest.FlatSpec.org$scalatest$FlatSpecLike$$super$run(FlatSpec.scala:1685)
	at org.scalatest.FlatSpecLike$$anonfun$run$1.apply(FlatSpecLike.scala:1795)
	at org.scalatest.FlatSpecLike$$anonfun$run$1.apply(FlatSpecLike.scala:1795)
	at org.scalatest.SuperEngine.runImpl(Engine.scala:518)
	at org.scalatest.FlatSpecLike$class.run(FlatSpecLike.scala:1795)
	at org.zouzias.spark.lucenerdd.LuceneRDDDataFrameMultivalues.org$scalatest$BeforeAndAfterAll$$super$run(LuceneRDDDataFrameMultivalues.scala:25)
	at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:213)
	at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)
	at org.zouzias.spark.lucenerdd.LuceneRDDDataFrameMultivalues.run(LuceneRDDDataFrameMultivalues.scala:25)
	at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
	at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1349)
	at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1343)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1343)
	at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1012)
	at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1011)
	at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1509)
	at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1011)
	at org.scalatest.tools.Runner$.run(Runner.scala:850)
	at org.scalatest.tools.Runner.run(Runner.scala)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:131)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)

Expected behavior
I don't think it should fail

[Implicits] Support MapType for Spark DataFrames

Is your feature request related to a problem? Please describe.
Currently, MapType are not supported for Spark DataFrames

Describe the solution you'd like
Add support for MapType Spark DataFrame columns

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

Additional context
Add any other context or screenshots about the feature request here.

Recommendation: Naming Persisted/Cached RDDs

After integrating LuceneRDD into our Spark application, I noticed a bunch of unnamed MapPartitionsRDD showing up in the sparkmaster:4040/storage webpage. There are several places in this codebase where rdd.persist(StorageLevel) is called without doing something like rdd.name = "LuceneRDDResponse" to make it more clear about what RDDs are contributing to stored. It looks like you have a setName method on LuceneRDD that could be easily extended to other RDDs in your library.

How to persist the indexed data and use it in later time?

As create index for massive data cost a lot of time, it is reasonable to persist indexed data and search with it in later time.

I have tried to save luceneRDD as textfile, but it seems not containing index data.

So how to persist it?

DataFrame to LuceneRDD

Currently i am able to convert a DataFrame to LuceneRDD but while queering i not getting any thing.
Scehma of the RDD

root
|-- ID: integer (nullable = false)
|-- Name: string (nullable = false)
|-- CountryCode: string (nullable = false)
|-- District: string (nullable = false)
|-- Population: integer (nullable = false)

Example Record : [1,Kabul,AFG,Kabol,1780000]
Scala Code:

luceneRDD.termQuery("name","Kabul",1)

Can anyone help me

Questions about query results

I read your github project spark-lucenerdd, it's great and the code is elegant. I have a question about LuceneRDD.*query().collect(), are the results from collect function global top K ? For example, when we index on a DataSet which consists of several paritions, and every partitions will be processed and store lucene index, after that, we do search from different paritions' index and collect them. Is it the right way that the query result from different partitions sorted by their score(Because I think that inverted list is not overall ). Please forgive me if there is any mis-understanding ! (My english is not very good , but i am working on it O(∩_∩)O). Thanks!

Trying to Index 26.1 GB, 6,3 Million rows DataFrame causes Spark Job to stuck

Describe the bug

I am trying to create a LuceneRDD from a Dataframe with 6,3 Million rows and 5 columns (some of them are Array). The Spark Job just stucks no logs no debug info, no nothing. And the executor times out after some time.

I am using the following approach:

To Reproduce

I am running in StandAlone cluster on one machine 64gb ram and 8 cores.
I am suing 1 executor with 50gb memory and 10gb memory for the driver 7 cores and 21 partitions.

I am initiating the LuceneRdd in the following way:

private val luceneRDD: LuceneRDD[Row] = LuceneRDD(df.coalesce(1), "", "", "bm25")

Expected behavior
A clear and concise description of what you expected to happen.

Versions (please complete the following information):

  • spark-lucenerdd version: 0.3.3
  • Maven version: 3.5.2
  • Spark Version: 2.3.1
  • Java version: Java 8

Support field-based analysis

In Lucene , we have the ability to analyze different fields using different analyzers. As initiated in #156 , next natural step should be to support different analysis depending on the field.

Example using Lucene Core :

Map<String, Analyzer> analyzerPerField = new HashMap<>();
analyzerPerField.put("firstname", new StandardAnalyzer());
analyzerPerField.put("address", new WhitespaceAnalyzer());

PerFieldAnalyzerWrapper aWrapper =  new PerFieldAnalyzerWrapper(new StandardAnalyzer(), analyzerPerField);

IndexWriterConfig config = new IndexWriterConfig(aWrapper);

More information : PerFieldAnalyzerWrapper

Store query that links the documents

Is your feature request related to a problem? Please describe.

Currently , I generate the linker dynamically from the input. If the data for a particular field is not present , I don't include it in the linker.

When I retrieve the results , I would like to have an extra column that shows me what is the query that linked both documents together so I can debug and improve the queries if needed.

Due to the nature and complexity of Lucene (analyzers,tokens,filter,etc) , I believe this feature can help in the debugging process.

I am looking for something like this :

DataFrame = spark.createDataFrame(linkedResults
       .map { case (left, topDocs , query) =>

Or perhaps :

DataFrame = spark.createDataFrame(linkedResults
       .map { case (left, topDocs ) =>
topDocs.query

Describe alternatives you've considered

Logging is an option I can think of , but I would need to link them manually from the logs and this is not really possible or practical to do

Another option I can think of is to create a column and store the query before the execution of the linker.The only problem I have with this solution is that to do so I need to run the function that generates the linker twice : one when preparing the data (and exclude this column from the analysis) and another when the actual link execution happens. For a large input this could be very time consuming. This is a more realistic alternative and I believe I could implement it right now.

java.lang.NoSuchMethodError for

Describe the bug
I am getting the following error when I am running the code.
java.lang.NoSuchMethodError: org.apache.lucene.index.MultiFields.getTermPositionsEnum(Lorg/apache/lucene/index/IndexReader;Ljava/lang/String;Lorg/apache/lucene/util/BytesRef;I)Lorg/apache/lucene/index/PostingsEnum;
at org.apache.lucene.facet.taxonomy.directory.TaxonomyIndexArrays.initParents(TaxonomyIndexArrays.java:131)
at org.apache.lucene.facet.taxonomy.directory.TaxonomyIndexArrays.(TaxonomyIndexArrays.java:55)
at org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyWriter.getTaxoArrays(DirectoryTaxonomyWriter.java:732)
at org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyWriter.addCategoryDocument(DirectoryTaxonomyWriter.java:503)
at

To Reproduce
code :
val array = Array("Hello", "world")
val rdd = LuceneRDD(array)
val result = rdd.termQuery("_1", "hello", 10)
result.foreach(println)

MVN Version : 2.8

Spark Version : 2.2

Expected behavior

Versions (please complete the following information):

  • spark-lucenerdd version: [e.g. 0.2.9.]
  • Spark Version: [e.g. 2.2]
  • Java version: [e.g. Java 8]

Additional context
Add any other context about the problem here.

Question about blockEntityLinkage

I have a question about the following line :

 val linkedResults = LuceneRDD.blockEntityLinkage(a, b,
      linker,
      Array("blocker"),
      Array("blocker"),
      500
    )

How will it link a and b?

a ) Search for all the entries of a in b ,
b) All the entries of b in a
c) combine a and b and search both?

Ideally , I'd like to archive the option b) , where I have a small table "b" that I'd like to link with a larger table "a"

Thank you.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.