zouzias / spark-lucenerdd Goto Github PK
View Code? Open in Web Editor NEWSpark RDD with Lucene's query and entity linkage capabilities
License: Apache License 2.0
Spark RDD with Lucene's query and entity linkage capabilities
License: Apache License 2.0
hi
I am trying to use Record Linkage in LuceneRDD in spark1.6.1 using https://github.com/zouzias/spark-lucenerdd/tree/branch_spark_1.x
I followed all the steps mentioned in the article.
When I do val luceneRDD = LuceneRDD(countries) I am getting the below error.
error: No implicit view available from String => org.apache.lucene.document.Document
any help is highly appreciated
LuceneRDD(countries)
^
Any help me to solve this issue is appreciated
Thanks in advance
As a user I want to be able to configure the indexing and the query analyzer.
I am using store.mode = "disk"
but I am still observing the GC overhead limit exceeded
exception
The size of my cluster is 116 GB
of RAM with 10 executors with 3 cores each , and I am trying to index 180M documents.
java.lang.OutOfMemoryError: GC overhead limit exceeded
at org.apache.lucene.util.ByteBlockPool$DirectTrackingAllocator.getByteBlock(ByteBlockPool.java:103)
at org.apache.lucene.util.ByteBlockPool.nextBuffer(ByteBlockPool.java:203)
at org.apache.lucene.index.TermsHashPerField.add(TermsHashPerField.java:118)
at org.apache.lucene.index.TermsHashPerField.add(TermsHashPerField.java:189)
at org.apache.lucene.index.DefaultIndexingChain$PerField.invert(DefaultIndexingChain.java:843)
at org.apache.lucene.index.DefaultIndexingChain.processField(DefaultIndexingChain.java:430)
at org.apache.lucene.index.DefaultIndexingChain.processDocument(DefaultIndexingChain.java:394)
at org.apache.lucene.index.DocumentsWriterPerThread.updateDocument(DocumentsWriterPerThread.java:251)
at org.apache.lucene.index.DocumentsWriter.updateDocument(DocumentsWriter.java:494)
at org.apache.lucene.index.IndexWriter.updateDocument(IndexWriter.java:1616)
at org.apache.lucene.index.IndexWriter.addDocument(IndexWriter.java:1235)
at org.zouzias.spark.lucenerdd.partition.LuceneRDDPartition$$anonfun$3.apply(LuceneRDDPartition.scala:72)
at org.zouzias.spark.lucenerdd.partition.LuceneRDDPartition$$anonfun$3.apply(LuceneRDDPartition.scala:69)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at org.zouzias.spark.lucenerdd.partition.LuceneRDDPartition.<init>(LuceneRDDPartition.scala:69)
at org.zouzias.spark.lucenerdd.partition.LuceneRDDPartition$.apply(LuceneRDDPartition.scala:260)
at org.zouzias.spark.lucenerdd.LuceneRDD$$anonfun$13$$anonfun$apply$7.apply(LuceneRDD.scala:520)
at org.zouzias.spark.lucenerdd.LuceneRDD$$anonfun$13$$anonfun$apply$7.apply(LuceneRDD.scala:517)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
19/03/13 14:35:53 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker for task 1506,5,main]
I have a question about the following line :
val linkedResults = LuceneRDD.blockEntityLinkage(a, b,
linker,
Array("blocker"),
Array("blocker"),
500
)
How will it link a and b?
a ) Search for all the entries of a in b ,
b) All the entries of b in a
c) combine a and b and search both?
Ideally , I'd like to archive the option b) , where I have a small table "b" that I'd like to link with a larger table "a"
Thank you.
Describe the bug
If we try to collect results after no results are found , the following exception is thrown :
Exception "requirement failed: TopK requires at least K>0"
To Reproduce
val entities = Seq("123").toDF("numbers")
entities.show(false)
val entitiesRDD = LuceneRDD(entities)
val results = entitiesRDD.termQuery("numbers","1234")
println(results.count) // 0
results.collect()
requirement failed: TopK requires at least K>0
java.lang.IllegalArgumentException: requirement failed: TopK requires at least K>0
at com.twitter.algebird.TopKMonoid.<init>(TopKMonoid.scala:52)
at org.zouzias.spark.lucenerdd.response.LuceneRDDResponse.collect(LuceneRDDResponse.scala:79)
at org.zouzias.spark.lucenerdd.LuceneRDDDataFrameMultivalues$$anonfun$1.apply(LuceneRDDDataFrameMultivalues.scala:53)
at org.zouzias.spark.lucenerdd.LuceneRDDDataFrameMultivalues$$anonfun$1.apply(LuceneRDDDataFrameMultivalues.scala:33)
at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.FlatSpecLike$$anon$1.apply(FlatSpecLike.scala:1682)
at org.scalatest.TestSuite$class.withFixture(TestSuite.scala:196)
at org.scalatest.FlatSpec.withFixture(FlatSpec.scala:1685)
at org.scalatest.FlatSpecLike$class.invokeWithFixture$1(FlatSpecLike.scala:1679)
at org.scalatest.FlatSpecLike$$anonfun$runTest$1.apply(FlatSpecLike.scala:1692)
at org.scalatest.FlatSpecLike$$anonfun$runTest$1.apply(FlatSpecLike.scala:1692)
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:286)
at org.scalatest.FlatSpecLike$class.runTest(FlatSpecLike.scala:1692)
at org.zouzias.spark.lucenerdd.LuceneRDDDataFrameMultivalues.org$scalatest$BeforeAndAfterEach$$super$runTest(LuceneRDDDataFrameMultivalues.scala:25)
at org.scalatest.BeforeAndAfterEach$class.runTest(BeforeAndAfterEach.scala:221)
at org.zouzias.spark.lucenerdd.LuceneRDDDataFrameMultivalues.runTest(LuceneRDDDataFrameMultivalues.scala:25)
at org.scalatest.FlatSpecLike$$anonfun$runTests$1.apply(FlatSpecLike.scala:1750)
at org.scalatest.FlatSpecLike$$anonfun$runTests$1.apply(FlatSpecLike.scala:1750)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:393)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:381)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:381)
at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:370)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:407)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:381)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:381)
at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:376)
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:458)
at org.scalatest.FlatSpecLike$class.runTests(FlatSpecLike.scala:1750)
at org.scalatest.FlatSpec.runTests(FlatSpec.scala:1685)
at org.scalatest.Suite$class.run(Suite.scala:1124)
at org.scalatest.FlatSpec.org$scalatest$FlatSpecLike$$super$run(FlatSpec.scala:1685)
at org.scalatest.FlatSpecLike$$anonfun$run$1.apply(FlatSpecLike.scala:1795)
at org.scalatest.FlatSpecLike$$anonfun$run$1.apply(FlatSpecLike.scala:1795)
at org.scalatest.SuperEngine.runImpl(Engine.scala:518)
at org.scalatest.FlatSpecLike$class.run(FlatSpecLike.scala:1795)
at org.zouzias.spark.lucenerdd.LuceneRDDDataFrameMultivalues.org$scalatest$BeforeAndAfterAll$$super$run(LuceneRDDDataFrameMultivalues.scala:25)
at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:213)
at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)
at org.zouzias.spark.lucenerdd.LuceneRDDDataFrameMultivalues.run(LuceneRDDDataFrameMultivalues.scala:25)
at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1349)
at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1343)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1343)
at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1012)
at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1011)
at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1509)
at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1011)
at org.scalatest.tools.Runner$.run(Runner.scala:850)
at org.scalatest.tools.Runner.run(Runner.scala)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:131)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)
Expected behavior
I don't think it should fail
As a user, I want LuceneRDD to handle arrays in input RDDs or DataFrames.
Hi,
I'm running the record linkage example https://github.com/zouzias/spark-lucenerdd/wiki/Record-Linkage for countries.txt in the cloudera quickstart vm http://www.cloudera.com/downloads/quickstart_vms/5-8.html and getting some errors.
starting the spark shell with the following for spark 1.6 compatibility:
spark-shell --packages org.zouzias:spark-lucenerdd_2.10:0.1.0
first issue is I'm getting the following error as spark starts
evicted modules:
org.scala-lang#scala-reflect;2.10.4 by [org.scala-lang#scala-reflect;2.10.5] in [default]
I continue with the example but when I run val luceneRDD = LuceneRDD(countries)
, I get the following error:
error: No implicit view available from String => org.apache.lucene.document.Document
Please can you advise?
Thanks
Dominic
In Lucene , we have the ability to analyze different fields using different analyzers. As initiated in #156 , next natural step should be to support different analysis depending on the field.
Example using Lucene Core :
Map<String, Analyzer> analyzerPerField = new HashMap<>();
analyzerPerField.put("firstname", new StandardAnalyzer());
analyzerPerField.put("address", new WhitespaceAnalyzer());
PerFieldAnalyzerWrapper aWrapper = new PerFieldAnalyzerWrapper(new StandardAnalyzer(), analyzerPerField);
IndexWriterConfig config = new IndexWriterConfig(aWrapper);
More information : PerFieldAnalyzerWrapper
Add version
method that returns project related information, i.e., version # etc.
Describe the solution you'd like
I'd like to use Lucence query builder to generate my queries instead of the query String. The toString
method does not guarantee compatibility and it fails for some cases.
For example :
val addressTerms = Seq("harvard","denver")
val booleanQueryBuilder = new BooleanQuery.Builder()
addressTerms.foreach(t=>{
booleanQueryBuilder.add(new FuzzyQuery(new Term("address", t)), BooleanClause.Occur.SHOULD)
})
val booleanQuery: BooleanQuery = booleanQueryBuilder.setMinimumNumberShouldMatch(addressTerms.size/2).build()
val query = booleanQuery.toString()
Produces the following exception :
(address:harvard~2 address:denver~2 )~3': Encountered " <FUZZY_SLOP> "~3 "" at line 1, column 95.
Doing a search and replace for the token "~" is not really an option because the query won't behave as expected
Currently, the termQuery
, prefixQuery
, etc take a parameter fieldName and this parameter is not checked for existence.
More thorough unit testing on feature #163
Describe the bug
I am getting the following error when I am running the code.
java.lang.NoSuchMethodError: org.apache.lucene.index.MultiFields.getTermPositionsEnum(Lorg/apache/lucene/index/IndexReader;Ljava/lang/String;Lorg/apache/lucene/util/BytesRef;I)Lorg/apache/lucene/index/PostingsEnum;
at org.apache.lucene.facet.taxonomy.directory.TaxonomyIndexArrays.initParents(TaxonomyIndexArrays.java:131)
at org.apache.lucene.facet.taxonomy.directory.TaxonomyIndexArrays.(TaxonomyIndexArrays.java:55)
at org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyWriter.getTaxoArrays(DirectoryTaxonomyWriter.java:732)
at org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyWriter.addCategoryDocument(DirectoryTaxonomyWriter.java:503)
at
To Reproduce
code :
val array = Array("Hello", "world")
val rdd = LuceneRDD(array)
val result = rdd.termQuery("_1", "hello", 10)
result.foreach(println)
MVN Version : 2.8
Spark Version : 2.2
Expected behavior
Versions (please complete the following information):
Additional context
Add any other context about the problem here.
Describe the bug
I'd like to link two dataframes blocking on a field named cd
using blockEntityLinkage
with the following Schemas :
df1
|-- name: string (nullable = true)
|-- address: string (nullable = true)
|-- cd: string (nullable = true)
df2
|-- name: string (nullable = true)
|-- address: string (nullable = true)
|--cd: string (nullable = true)
val linkedResults = LuceneRDD.blockEntityLinkage(df1 , df2 ,
linker,
Array("cd"),
Array("cd"),
500
)
But it produces the following exception :
Exception in thread "main" org.apache.spark.sql.AnalysisException: resolved attribute(s) cd#351 missing from name#444,address#454,cd#101 in operator !Project [name#444, address#454, cd#101, concat(cd#351) AS __PARTITION_COLUMN__#481];;
!Project [name#444, address#454, cd#101, concat(cd#351) AS __PARTITION_COLUMN__#481]
+- Project [name#444, address#454, cd#101]
A self join in df1 works just fine
I tried renaming the column but it did not work.
I am running Spark 2.1.0
and lucenerdd 0.3.3
Edit : An explanation about the issue can be found here but I don't believe it can be fixed on my end.
Thank you
Given that the current method signature is described as follow :
indexAnalyzer: String = getOrElseEn(IndexAnalyzerConfigName),
queryAnalyzer: String = getOrElseEn(QueryAnalyzerConfigName),
It would be great to be able to provide the instance of the analyzer instead of a String. This would open the door to create custom analyzers and use them with LuceneRDD.
I tried to create a package org.apache.lucene.analysis
and a class "StandardCustomStopWords"
which extends Analyzer and passed indexAnalyzer = "StandardCustomStopWords"
but that does not seem to work. It also did not print any type of warning , so we might need to consider adding this also
I am guessing that compiling a custom lucene build is a solution , but it is not very convenient or practical.
After integrating LuceneRDD into our Spark application, I noticed a bunch of unnamed MapPartitionsRDD showing up in the sparkmaster:4040/storage webpage. There are several places in this codebase where rdd.persist(StorageLevel) is called without doing something like rdd.name = "LuceneRDDResponse" to make it more clear about what RDDs are contributing to stored. It looks like you have a setName method on LuceneRDD that could be easily extended to other RDDs in your library.
Is your feature request related to a problem? Please describe.
Version update
Describe the solution you'd like
Update sbt to version 1.x (latest)
Describe alternatives you've considered
None
Additional context
First, #174 must be resolved.
As create index for massive data cost a lot of time, it is reasonable to persist indexed data and search with it in later time.
I have tried to save luceneRDD as textfile, but it seems not containing index data.
So how to persist it?
This looks really useful. I'd like to be able to use this from PySpark, or failing that, from Java.
Do you have any pointers?
This is an issue similar to #136 , but I decided to create a new issue to avoid pinging an old and closed issue.
I have a relatively big table (~180 million records) for which I do daily linking against a smaller sets (200k).
From what I understand , every time I launch the process , I need to produce tokens for both tables which is very time consuming for the big table.
Is it possible to do a one time index process over the big table and store it on disk for later use? Also , is it possible to append/update it? (the big table will receive updates daily)
In general , ElasticSearch provides this functionality but it requires a HTTP call and a second infra which I'd like to avoid.
I have a Lucene index which I'd like to use to process a large corpus of data with.
Is it possible with this library to load in a prebuilt Lucene index and then process the data against it?
Is your feature request related to a problem? Please describe.
Spark supports 2.12 without experimental tag since 2.4.1. It would be nice to support it as well here.
Describe the solution you'd like
A clear and concise description of what you want to happen.
Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.
Additional context
Blockers are:
Currently i am able to convert a DataFrame to LuceneRDD but while queering i not getting any thing.
Scehma of the RDD
root
|-- ID: integer (nullable = false)
|-- Name: string (nullable = false)
|-- CountryCode: string (nullable = false)
|-- District: string (nullable = false)
|-- Population: integer (nullable = false)
Example Record : [1,Kabul,AFG,Kabol,1780000]
Scala Code:
luceneRDD.termQuery("name","Kabul",1)
Can anyone help me
As a user, I want to be able to change Lucene's scoring similarity.
Minimal functionality: Provide configuration that allows the user to change the similarity from TF-IDF to BM25 (say)
Is your feature request related to a problem? Please describe.
Currently, MapType
are not supported for Spark DataFrames
Describe the solution you'd like
Add support for MapType Spark DataFrame columns
Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.
Additional context
Add any other context or screenshots about the feature request here.
Describe the bug
If indexing process is not successful( Example : Unsupported types) , logs show that the document was indexed successfully.
To Reproduce
val entities = Seq(
("123",Seq("Peter", "Johan"),"123"),
("125",Seq("Peter"),"123"),
("1234",Seq("Marc", "Johan"),"210")
).toDF("id","names","address").coalesce(1)
val entitiesRDD = LuceneRDD(entities)
entitiesRDD.count
names
is not supported because it is a multivalued entry and the return document from the indexing is an empty document.
19/03/29 03:26:08 INFO LuceneRDDPartition: Index disk location C:\Users\user\AppData\Local\Temp\
19/03/29 03:26:08 INFO LuceneRDDPartition: Config parameter lucenerdd.index.store.mode is set to 'disk'
19/03/29 03:26:08 INFO LuceneRDDPartition: Lucene index will be storage in disk
19/03/29 03:26:08 INFO LuceneRDDPartition: Index disk location C:\Users\user\AppData\Local\Temp\
19/03/29 03:26:08 INFO LuceneRDDPartition: [partId=0] Partition is created...
19/03/29 03:26:08 INFO LuceneRDDPartition: [partId=0]Indexing process initiated at 2019-03-29T03:26:08.170-04:00...
19/03/29 03:26:08 INFO LuceneRDDPartition: [partId=0]Indexing process completed at 2019-03-29T03:26:08.755-04:00...
19/03/29 03:26:08 INFO LuceneRDDPartition: [partId=0]Indexing process took 0 seconds...
19/03/29 03:26:08 INFO LuceneRDDPartition: [partId=0]Indexed 1 documents
Expected behavior
I believe indexing should fail or at least give a warning
This may be related to #115
Add implicit conversion for DataFrame and assume that spatial shape is field named __shape__
Describe the bug
I am trying to create a LuceneRDD from a Dataframe with 6,3 Million rows and 5 columns (some of them are Array). The Spark Job just stucks no logs no debug info, no nothing. And the executor times out after some time.
I am using the following approach:
To Reproduce
I am running in StandAlone cluster on one machine 64gb ram and 8 cores.
I am suing 1 executor with 50gb memory and 10gb memory for the driver 7 cores and 21 partitions.
I am initiating the LuceneRdd in the following way:
private val luceneRDD: LuceneRDD[Row] = LuceneRDD(df.coalesce(1), "", "", "bm25")
Expected behavior
A clear and concise description of what you expected to happen.
Versions (please complete the following information):
I read your github project spark-lucenerdd, it's great and the code is elegant. I have a question about LuceneRDD.*query().collect(), are the results from collect function global top K ? For example, when we index on a DataSet which consists of several paritions, and every partitions will be processed and store lucene index, after that, we do search from different paritions' index and collect them. Is it the right way that the query result from different partitions sorted by their score(Because I think that inverted list is not overall ). Please forgive me if there is any mis-understanding ! (My english is not very good , but i am working on it O(∩_∩)O). Thanks!
Currently in ShapeLuceneRDD
all input fields are indexed. However, a common use case is to index only the spatial part of the input.
Implementation will require a zipWithIndex on the RDD data
Done:
While checking my logs, I saw the following events :
2019-03-12 14:24:09,730 INFO [Executor task launch worker-1] partition.LuceneRDDPartition: [partId=514]Indexing process completed at 2019-03-12T14:24:09.730-07:00...
2019-03-12 14:24:09,730 INFO [Executor task launch worker-1] partition.LuceneRDDPartition: [partId=514]Indexing process took 28 seconds...
2019-03-12 14:24:10,046 INFO [Executor task launch worker-1] partition.LuceneRDDPartition: [partId=514]Indexed 852063 documents
2019-03-12 14:24:13,465 INFO [Executor task launch worker-0] partition.LuceneRDDPartition: [partId=366]Indexing process completed at 2019-03-12T14:24:13.465-07:00...
2019-03-12 14:24:13,465 INFO [Executor task launch worker-0] partition.LuceneRDDPartition: [partId=366]Indexing process took 32 seconds...
2019-03-12 14:24:13,897 INFO [Executor task launch worker-0] partition.LuceneRDDPartition: [partId=366]Indexed 986009 documents
2019-03-12 14:23:36,578 INFO [Executor task launch worker-1] partition.LuceneRDDPartition: [partId=79]Indexing process initiated at 2019-03-12T14:23:36.510-07:00...
2019-03-12 14:23:36,584 INFO [Executor task launch worker-1] partition.LuceneRDDPartition: [partId=79]Indexing process completed at 2019-03-12T14:23:36.582-07:00...
2019-03-12 14:23:36,585 INFO [Executor task launch worker-1] partition.LuceneRDDPartition: [partId=79]**Indexing process took 0 seconds...**
What do you think could be the reason that indexing took 0 seconds? / 0 documents while other partitions seem to have up to 852063 documents. Could that be an example of skewed partitions?
Currently there is no support for storing the queried results from LuceneRDDResponse
.
There should be a .toDF()
and / or toDS()
method that allows the user to persist the results.
Reported by Volkmar
In phrase search, it needs me to specify a field name. But in practice I have a lot of columns and searching each field one by one seems not convenient. Does it support to do search without specifying field?
A deduplication method should be introduced in LuceneRDD
Lucene's IndexWriter
is thread-safe. So indexing should be multithreaded per executor.
Backport cartesian linker method for version 0.2.X that support JVM 7.
Hi all,
I would like to use that, and want to know how much index size is supported by spark-lucenerdd? Could it support million size documents ? I saw the default config of index store is using memory, while our index size seems cannot load to memory once.
Thanks
Aimee
With the removal of the implicits it's no longer possible to run the Geo examples:
error: No implicit view available from (Double, Double) => com.spatial4j.core.shape.Shape
Is your feature request related to a problem? Please describe.
Currently , I generate the linker
dynamically from the input. If the data for a particular field is not present , I don't include it in the linker.
When I retrieve the results , I would like to have an extra column that shows me what is the query that linked both documents together so I can debug and improve the queries if needed.
Due to the nature and complexity of Lucene (analyzers,tokens,filter,etc) , I believe this feature can help in the debugging process.
I am looking for something like this :
DataFrame = spark.createDataFrame(linkedResults
.map { case (left, topDocs , query) =>
Or perhaps :
DataFrame = spark.createDataFrame(linkedResults
.map { case (left, topDocs ) =>
topDocs.query
Describe alternatives you've considered
Logging is an option I can think of , but I would need to link them manually from the logs and this is not really possible or practical to do
Another option I can think of is to create a column and store the query before the execution of the linker.The only problem I have with this solution is that to do so I need to run the function that generates the linker twice : one when preparing the data (and exclude this column from the analysis) and another when the actual link execution happens. For a large input this could be very time consuming. This is a more realistic alternative and I believe I could implement it right now.
It seems really helpful to me. But when I try to use it in Java, like initiate with new LuceneRDD(df.RDD())
in java, it seems need some other arguments, such as partitionsRDD, indexAnalyzer, queryAnalyzer, similarity. How to set these argument?
Thanks.
Describe the bug
I am running the LinkageAbtvsBuy example as it is , but I am receiving the following exception at runtime:
2019-01-23 11:18:01 ERROR Executor:91 - Exception in task 1.0 in stage 6.0 (TID 9)
java.lang.NoSuchMethodError: org.apache.lucene.index.IndexWriter.addDocument(Ljava/lang/Iterable;)V
at org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyWriter.addCategoryDocument(DirectoryTaxonomyWriter.java:496)
at org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyWriter.internalAddCategory(DirectoryTaxonomyWriter.java:457)
at org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyWriter.addCategory(DirectoryTaxonomyWriter.java:424)
at org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyWriter.<init>(DirectoryTaxonomyWriter.java:208)
at org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyWriter.<init>(DirectoryTaxonomyWriter.java:287)
at org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyWriter.<init>(DirectoryTaxonomyWriter.java:304)
at org.zouzias.spark.lucenerdd.store.IndexWithTaxonomyWriter$class.taxoWriter(IndexWithTaxonomyWriter.scala:37)
at org.zouzias.spark.lucenerdd.partition.LuceneRDDPartition.taxoWriter$lzycompute(LuceneRDDPartition.scala:47)
at org.zouzias.spark.lucenerdd.partition.LuceneRDDPartition.taxoWriter(LuceneRDDPartition.scala:47)
at org.zouzias.spark.lucenerdd.partition.LuceneRDDPartition$$anonfun$3.apply(LuceneRDDPartition.scala:70)
at org.zouzias.spark.lucenerdd.partition.LuceneRDDPartition$$anonfun$3.apply(LuceneRDDPartition.scala:67)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at org.zouzias.spark.lucenerdd.partition.LuceneRDDPartition.<init>(LuceneRDDPartition.scala:67)
at org.zouzias.spark.lucenerdd.partition.LuceneRDDPartition$.apply(LuceneRDDPartition.scala:243)
at org.zouzias.spark.lucenerdd.LuceneRDD$$anonfun$6.apply(LuceneRDD.scala:347)
at org.zouzias.spark.lucenerdd.LuceneRDD$$anonfun$6.apply(LuceneRDD.scala:347)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:843)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:843)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2019-01-23 11:18:01 ERROR Executor:91 - Exception in task 0.0 in stage 6.0 (TID 8)
java.lang.NoSuchMethodError: org.apache.lucene.index.IndexWriter.addDocument(Ljava/lang/Iterable;)V
at org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyWriter.addCategoryDocument(DirectoryTaxonomyWriter.java:496)
at org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyWriter.internalAddCategory(DirectoryTaxonomyWriter.java:457)
at org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyWriter.addCategory(DirectoryTaxonomyWriter.java:424)
at org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyWriter.<init>(DirectoryTaxonomyWriter.java:208)
at org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyWriter.<init>(DirectoryTaxonomyWriter.java:287)
at org.apache.lucene.facet.taxonomy.directory.DirectoryTaxonomyWriter.<init>(DirectoryTaxonomyWriter.java:304)
at org.zouzias.spark.lucenerdd.store.IndexWithTaxonomyWriter$class.taxoWriter(IndexWithTaxonomyWriter.scala:37)
at org.zouzias.spark.lucenerdd.partition.LuceneRDDPartition.taxoWriter$lzycompute(LuceneRDDPartition.scala:47)
at org.zouzias.spark.lucenerdd.partition.LuceneRDDPartition.taxoWriter(LuceneRDDPartition.scala:47)
at org.zouzias.spark.lucenerdd.partition.LuceneRDDPartition$$anonfun$3.apply(LuceneRDDPartition.scala:70)
at org.zouzias.spark.lucenerdd.partition.LuceneRDDPartition$$anonfun$3.apply(LuceneRDDPartition.scala:67)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at org.zouzias.spark.lucenerdd.partition.LuceneRDDPartition.<init>(LuceneRDDPartition.scala:67)
at org.zouzias.spark.lucenerdd.partition.LuceneRDDPartition$.apply(LuceneRDDPartition.scala:243)
at org.zouzias.spark.lucenerdd.LuceneRDD$$anonfun$6.apply(LuceneRDD.scala:347)
at org.zouzias.spark.lucenerdd.LuceneRDD$$anonfun$6.apply(LuceneRDD.scala:347)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:843)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:843)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
To Reproduce
Run LinkageAbtvsBuy
with spark-lucenerdd version: 0.2.8 and Spark 2.1.0
Expected behavior
It should not produce such exception
Versions (please complete the following information):
Is your feature request related to a problem? Please describe.
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]
Describe the solution you'd like
A clear and concise description of what you want to happen.
Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.
Additional context
can we try same code with pattern match for suffix or something like regular exp. Any one has tried different option pls let me know.
Currently, the term vectors are not stored.
As a user I want to be able to store term vectors information.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.