Giter Club home page Giter Club logo

hyperspace's Introduction

Icon

Hyperspace

An open source indexing subsystem that brings index-based query acceleration to Apache Spark™ and big data workloads.

aka.ms/hyperspace

Build Status javadoc

Contributing

This project welcomes contributions and suggestions. Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. For details, visit https://cla.opensource.microsoft.com.

When you submit a pull request, a CLA bot will automatically determine whether you need to provide a CLA and decorate the PR appropriately (e.g., status check, comment). Simply follow the instructions provided by the bot. You will only need to do this once across all repos using our CLA.

Please review our contribution guide.

Development on Windows

This repository contains symbolic links which don't work properly on Windows. To build this project on Windows, you can use our provided Git aliases to replace symbolic links with junctions.

$ git config --local include.path ../dev/.gitconfig
$ git replace-symlinks # replace symlinks with junctions
$ #git restore-symlinks # use this to restore symlinks if you need

Using IntelliJ

You can use the built-in sbt shell in IntelliJ without any problems. However, the built-in "Build Project" command may not work. To fix the issue, go to Project Structure -> Project Settings -> Modules and follow these steps:

  • Mark src/main/scala and src/main/scala-spark2 as "Sources" and src/test/scala and src/test/scala-spark2 as "Tests" for the spark2_4 module.
  • Mark src/main/scala and src/main/scala-spark3 as "Sources" and src/test/scala and src/test/scala-spark3 as "Tests" for the spark3_0 module.
  • Remove the root and hyperspace-sources modules.
  • An example of Project Structure

Additionally, you might have to run sbt buildInfo if you encounter an error like object BuildInfo is not a member of package com.microsoft.hyperspace for the first build.

Inspiration and Special Thanks

This project would not have been possible without the outstanding work from the following communities:

  • Apache Spark: Unified Analytics Engine for Big Data, the engine that Hyperspace builds on top of.
  • Delta Lake: Delta Lake is an open-source storage layer that brings ACID transactions to Apache Spark™ and big data workloads. Hyperspace derives quite a bit of inspiration from the way the Delta Lake community operates and pioneering of some surrounding ideas in the context of data lakes (e.g., their novel use of optimistic concurrency).
  • Databricks: Unified analytics platform. Many thanks to all the inspiration they have provided us.
  • .NET for Apache Spark™: Hyperspace offers .NET bindings for developers, thanks to the efforts of this team in collaborating and releasing the bindings just-in-time.
  • Minimal Mistakes: The awesome theme behind Hyperspace documentation.

Code of Conduct

This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact [email protected] with any additional questions or comments.

License

Apache License 2.0, see LICENSE.

hyperspace's People

Contributors

affogarty avatar alex-shchetkov avatar andrei-ionescu avatar apoorvedave1 avatar clee704 avatar hyperspace-bot avatar imback82 avatar justinbreese avatar kaustubhkhare avatar microsoftopensource avatar paryoja avatar pirz avatar rapoth avatar saucam avatar sezruby avatar thrajput avatar thugsatbay avatar veysiertekin avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

hyperspace's Issues

Add support for deletes in RefreshIndex for incremental mode

Describe the issue

Once some files or partitions are removed from source data, fix the index and remove corresponding index records from the index. This action is triggered by calling refreshIndex API.

Overview.
Using index metadata, we find which data files are removed.
Using lineage in index, we can find index records which were generated from removed files' data records. We re-generate affected index files by removing such records.

Related Issues.

  • Add Lineage to index records (issue #104)
  • Remove deleted records during refresh (issue #133)
  • Merge refresh action for delete to index refresh (issue #149)

To Reproduce

N/A

Expected behavior

  1. Create an index on top of some partitioned data.
  2. Remove some files and/or partitions from source data.
  3. Call refreshIndex on index.
  4. Upon successful termination of refresh index, records corresponding to the deleted records from source data should no longer be present in the index and the index should be in active and usable state.

Environment

All environments.

Add signature computation support for arbitrary logical plans

Describe the issue

Currently, Hyperspace only supports generating plan signature for logical plans consisting of only logical relation nodes (Check FileBasedSignatureProvider).
Moving forward, Hyperspace needs to support generating signature for arbitrary logical plans.

To Reproduce

NA

Expected behavior

Signature computation code supports arbitrary logical plans and not just plans consisting of logical relation nodes.

Environment

All

Implement smart refresh indexes of append-only data

Describe the issue

Implement refresh(mode = "smart") of refresh of incremental data. The smart mode does a combination of #110 and #111 in a single step when the user calls the above api.
It uses the new unindexed data along with the small files from previous index snapshot to create a new index version.

Thanks @sezruby for suggesting this.

To Reproduce

Expected behavior

Environment

Create a sample Hyperspace "HelloWorld" application

Describe the issue

Write a sample Hyperspace HelloWorld application code with some simple and reasonable scenario.
Minimally the app code should create some example indexes on some sample data and demonstrate how those indexes can be maintained and used.

Main goal for the app is to show how one can import and use Hyperspace within a Spark application.

To Reproduce

NA

Expected behavior

An example (and ideally configurable) Spark application code which demonstrate core features and capabilities of Hyperspace.

Environment

All environments.

Use a clock trait in index cache

Describe the issue

Add clock trait to the code and modify CreationTimeBasedIndexCache to use the clock with System time by default. Tests cases under IndexCacheTest should be modified to not use Thread.sleep.

To Reproduce

NA

Expected behavior

CreationTimeBasedIndexCache should be modified to use a clock.

Thread.sleep usage should be removed from index cache test cases.

Environment

Index Ranking for Filter Index Rule

Describe the issue

Improve ranking algorithm for index selection for Filter Indexes. Current rule picks the first index which contains a column from filter predicates. Improve it to rank indexes and possibly pick a better index which contains more columns from filter predicates

To Reproduce

NA

Expected behavior

NA

Environment

NA

deleteOldVersions API

Describe the problem
After refreshing an index, the old version of the index remains on the storage. We should keep the old versions to support consistency & isolation of index data, but at some point in time, they're no longer needed.
So it would be good if there's an API to clean up the old versions.

Describe your proposed solution

API design

def deleteOldVersions(indexName: String)

hs.deleteOldVersions("indexName")

But there's no API to show the list of versions. I think it would be great to provide an API for statistics of an index so that a user can check [ size of index / existing versions / creation time / last used time(from event log).. etc]

Now hs.index("indexName") returns "indexContentPaths" column that shows the paths referred by the latest index version.
So based on that info, we could validate the given versions and determine which versions should we delete.

Additional context

Add documentation for FilterIndexRule

Describe the issue

Write a readme file for FilterIndexRule .
The description should describe patterns that the rule looks for in a query plan and how the query plan is transformed to leverage an index.

To Reproduce

NA

Expected behavior

Hyperspace documentation should be extended with a readme file on FilterIndexRule.

Environment

NA

Add optimization rule for Aggregations

Describe the issue

Hash based aggregations work on same core principles as Hash based Join Algorithms like SortMergeJoin. Bucketing can be utilized for such Aggregation nodes also. Investigate and Add AggregationIndexRule which utilizes bucketized indexes.

To Reproduce

NA

Expected behavior

NA

Environment

NA

Inconsistent behavior in python's hyperspace.explain after df.explain from scala version of hyperspace.explain

Describe the issue

The hyperspace.explain(df) API prints out same plans in both indexed and non indexed section after Hyperspace.enable(spark) and df.explain().

To Reproduce

In pyspark, do below.

  1. Enable hyperspace.
  2. Create a dataframe that is expected to have applied indexes.
  3. Do df.explain
  4. Do hyperspace.explain(df...)

Reproducible in synapse as shown below.

image

Expected behavior

  1. In scala, even after df.explain we can use hyperspace.explain(df) and see non indexed and indexed plan.

image

Environment

spark 2.4.4.2.6 and python 3.6 in syanpse.

Rule Based Index Recommender

Describe the issue

Build a rule based Index Recommender. Rule-Based means it just looks at the query plans of your workload and suggests indexes based on that.
(Its alternative would be a cost-based recommender which would also look at the data and it's properties, stats etc. to suggest the indexes)

To Reproduce

NA

Expected behavior

NA

Environment

na

Deduplicate getIndexesForPlan() and findCoveringIndexes() functions which are duplicated in FilterIndexRule and JoinIndexRule

Describe the issue

FilterIndexRule.findCoveringIndexes() function and JoinIndexRule.getIndexesForPlan() function are similar in implementation.
Refactor them as common code in a single function and change the classes to use that.

To Reproduce

NA

Expected behavior

FilterIndex rule and JoinIndex rule implementation under rules should not have duplicate code for finding covering indexes.

Environment

NA

Store index file names and file sizes in operation log

Describe the issue

Upcoming features require all index files to be present in the index metadata. Also, file sizes of individual files is required. A sample of this could be:

{
 ...
 "files": ["root/indexfile1:100MB", "root/indexFile2:1MB"...]
...
}

To Reproduce

Enhancement

Expected behavior

Index creation / modification adds list of created index files to the operation log along with their file size

Environment

na

Add Alias column support in join condition for JoinIndex rule

Describe the issue
Add support for org.apache.spark.sql.catalyst.expressions.Alias as column type in JoinIndexRule.scala. Currently, this rule only supports Join Conditions on org.apache.spark.sql.catalyst.expressions.Attributes . Consider a case where a column c from the base table has an alias aliasC. A Join query with c will be able to utilize indexes but a query with aliasC won't be.

To Reproduce
Create a query plan of the form

select T1.A, T2Temp.B 
from T1 
INNER JOIN (
   select c as aliasC 
   from T2
) as T2Temp 
where T1.C = T2Temp.aliasC

Expected behavior
Query utilizes index

Screenshots
NA
Desktop (please complete the following information):
NA
Additional context
NA

Add RefreshIncrementalAction class to support index creation on newly appended data

Describe the issue

Implement incremental indexing support for append-only data. Implementation outline:

  • identify newly added data files
  • create new index version on these files
  • update metadata to reflect the latest snapshot of index to include old files and new files

New Api: refresh(mode = quick)

To Reproduce

na

Expected behavior

na

Environment

na

Automatic Periodic compaction of indexes

Describe the issue

Blocked on #60
After a few instances of incremenatal indexing, the hyperspace system would auto-recognize that the number of index files are too large now and it's time to optimize() the indexes. On the next call (e.g. every 5th or 10th call) of incremental indexing, an auto compactor should automatically compact them by calling optimize or something similar.

To Reproduce

Expected behavior

User doesn't need to call optimize() themselves. It runs automatically periodically.

Environment

NA

Add C# Notebook for Hyperspace

Describe the issue

Currently, there is only a Scala notebook provided that demonstrates the e2e functionality of Hyperspace. It would be good to have a C# notebook as well.

To Reproduce

N/A

Expected behavior

N/A

Environment

N/A

Add Config file for Hyperspace

Describe the issue

Unify all Hyperspace related configurations in a single configuration file and add them to spark.conf.
Check here for a list of Hyperspace configurations.

Today, for changing any Hyperspace related config, user needs to manually add it to spark configurations. For example:
spark.conf.set(IndexConstants.INDEX_SYSTEM_PATH, "some valid path")

Ideally, we want these config overrides to be added to a config file, and by enabling Hyperspace they all get added to spark conf automatically (without any action from user side).

To Reproduce

NA

Expected behavior

A user should be able to add a configuration file and edit it with Hyperspace configurations and the configs should be added automatically to Spark Session.

Environment

NA

Add stable state check in IndexLogManager.createLatestStableLog()

Describe the issue

In IndexLogManager.createLatestStableLog() make sure the log being copied has a stable state. The api currently makes a copy of whatever log id it's passed, without 'assert'ing that the state of this log is stable (i.e. ACTIVE, DELETED, DOESNOTEXIST)

To Reproduce

NA

Expected behavior

NA

Environment

NA

Add Python Notebook for Hyperspace

Describe the issue

Currently, there is only a Scala notebook provided that demonstrates the e2e functionality of Hyperspace. It would be good to have a Python notebook as well.

To Reproduce

N/A

Expected behavior

N/A

Environment

N/A

Provide readme with design explanation for explain API

Describe the issue

Write an explanation on explain() API. Check Hyperspace.scala.
The description should include design overview of the API and provides examples of invoking it and how the output should be interpreted.

To Reproduce

NA

Expected behavior

Hyperspace documentation should be extended with explain() API readme.

Environment

NA

Add documentation for concurrency model

Describe the issue

Write a readme file for concurrency model in Hyperspace.
The description should describe important design details on how concurrent index modification actions are handled in Hyperspace along with examples.

To Reproduce

NA

Expected behavior

Hyperspace documentation should be extended with a readme file on concurrency model.

Environment

NA

Revisit logical plan serialization

Describe the issue

Hyperspace serializes the logical plan of a dataframe used for creating an index so that the plan can be reused for refreshing the index, etc. The current implementation uses kyro serializer to serialize a logical plan object: https://github.com/microsoft/hyperspace/blob/master/src/main/scala/com/microsoft/hyperspace/index/serde/LogicalPlanSerDeUtils.scala

However, this is not an ideal approach since the serialized bytes are not compatible across Scala versions (2.11 vs. 2.12) and Spark versions (2.4 vs. 3.0). In addition, since those logical plan classes are internal, they could theoretically be changed (or renamed) across patch versions. In other words, compatibility just doesn't work.

I think representing it as JSON instead should serve our purpose and I am open for any other feedback.

Add an API to determine if an index is invalid (i.e., if base table changed).

Describe the issue

Add an API like hs.isValid(indexName) which returns true or false depending on whether the index is valid.
An Index can become invalid if the original data is changed. This would lead to signature mismatch and index would become unusable.

To Reproduce

Expected behavior

Screenshots

Desktop (please complete the following information):

Add documentation for JoinIndexRule

Describe the issue

Write a readme file for JoinIndexRule .
The description should describe patterns that the rule looks for in a query plan and how the query plan is transformed to leverage indexes.

To Reproduce

NA

Expected behavior

Hyperspace documentation should be extended with a readme file on JoinIndexRule.

Environment

NA

Hybrid Scans: Use indexes of partially indexed data, and use original data for unindexed data

Describe the issue

Currently if new additional data gets added to the old data, the indexes on the older part of data become obsolete. Hyperspace doesn't use this stale index at all. Investigate and find if some form of "Hybrid" scanning could be done to utilize the partially created indexes if possible and if it improves performance.

To Reproduce

NA

Expected behavior

If new data is added, indexes from the older data are still usable along with additional work done for the newly arrived data

Environment

NA

Add full list of index files to index log entry

Describe the issue

Currently, as part of index metadata we store the root path of index files which the path to the directory containing index files in index log entry. Check here.

We need to change it and store full list of all index files in the Metadata.

To Reproduce

N/A

Expected behavior

Upon successful creation of a covering index, index metadata should contain a full list of index files under content in the corresponding index log entry.

Environment

All environments.

SparkSQL based index creation

Describe the problem
#95 addressed the serialization / deserialization of DataFrame for refresh index.
It seems we need some intermediate representation for the source DataFrame to support Spark/Scala version compatibility which is not guaranteed by available serializers in Spark - KryoSerializer or JavaSerializer.
Since design & implementation of IR might take some time, it would be good to introduce the alternative way using sparkSQL, suggested by @rapoth #95 (comment)

For now, sql based index creation will still be limited to only covering index on a LogicalRelation, though SparkSql can cover various plans such as Filter & Join .. etc. (will address indexes on arbitrary plan later)

Describe your proposed solution

// createIndex API 
def createIndex(sourceSql: String, indexConfig: IndexConfig): unit = {
  val df = spark.sql(indexConfig.generateSql)
  indexManager.create(df, indexConfig.copy(sourceSql = sql))
}

hs.createIndex(sourceSql, IndexConfig(..))

// new field in IndexConfig (to keep the sql in IndexLogEntry)
case class IndexConfig(
    indexName: String,
    indexedColumns: Seq[String],
    includedColumns: Seq[String] = Seq(),
    sourceSql: String = "")

// example
val df = spark.read.parquet("table2").createTempView("table2")
hs.createIndex("select * from table2", IndexConfig("indexName", Seq("id"), Seq("name")))

Describe alternatives you've considered

Additional context

Optimize Indexes support

Describe the issue

Blocked by #29
Once hyperspace has an incremental indexing support, add support for optimizing indexes. This would pick parts of indexes which are made of large number of small files, and combine/compact them into small number of large files. This would improve the performance of the index since the number of index files would be greatly reduced

To Reproduce

NA

Expected behavior

hyperspace.optimize(indexName) would update indexes to improve their performance

Environment

NA

Hyperspace Create Index Fails for csv if data is materialized before

Describe the issue

Probable root cause of dotnet/spark#580
Hyperspace index with dataset (CSV) and fails during the creation if data is materialized just before create index

To Reproduce

I could repro the issue only if I call df.show() before hs.createIndex(). If I don't call df.show(), it works fine and creates index without issue.

object Test extends App {
  val spark: SparkSession = ...
  
  val df = spark.read.csv("csv/*/*")
  df.show()                            // commenting this line eliminates the issue
  val hs = new Hyperspace(spark)
  hs.createIndex(df, new IndexConfig("ic2", Seq("_c0"), Seq()))
}

Expected behavior

New index created and app will show in the log.

Additional Context

com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
Serialization trace:
classes (sun.misc.Launcher$AppClassLoader)
classloader (java.security.ProtectionDomain)
context (java.security.AccessControlContext)
acc (org.apache.spark.util.MutableURLClassLoader)
classLoader (org.apache.hadoop.conf.Configuration)
conf (org.apache.hadoop.io.compress.BZip2Codec)
codecs (org.apache.hadoop.io.compress.CompressionCodecFactory)
codecFactory (org.apache.spark.sql.execution.datasources.csv.CSVFileFormat)
fileFormat (com.microsoft.hyperspace.index.serde.package$HadoopFsRelationWrapper)
relation (com.microsoft.hyperspace.index.serde.package$LogicalRelationWrapper)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:101)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79)
...
Caused by: java.util.ConcurrentModificationException
at java.util.Vector$Itr.checkForComodification(Unknown Source)
at java.util.Vector$Itr.next(Unknown Source)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:99)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40)
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79)

Add documentation for progress (operation) log

Describe the issue

Write a readme file for the structure of progress log in Hyperspace.
The description should describe the format of progress (operation) log for indexes along with examples.

To Reproduce

NA

Expected behavior

Hyperspace documentation should be extended with a readme file on progress log format.

Environment

NA

Incremental Indexing: Support for incremental changes to index when new data is added

Describe the issue

Currently when new data is added, the indexes become obsolete. The only way to make them usable is through the refresh() api which recreates complete index from scratch. Add functionality to support creation of indexes only on additional new data instead of recreating from scratch.

Expected behavior

New additional data to old data doesn't require rebuilding of indexes on complete data. Only additional data gets indexed

Intermediate issues and PRs

  • Capture source file info to detect the change of the dataset #120
  • The root paths captured in index log entry should always be absolute paths #126
  • Store index file names and file sizes in operation log #107
  • Add full list of index files to index log entry #103
  • Index metadata should always store absolute paths (alternate solution) #128, #127
  • Store file name, size and modified time of index files in metadata #122
  • New Tree based Structure for index metadata #139
  • Index merge functionality PR: #162
  • Incremental indexing support for Append-only data #110 , PR: #163

Remove test dependency on IndexLogManager.getLatestLog() and mark it as final.

Describe the issue

Code Cleanup:
Remove test dependency on IndexLogManager.getLatestLog() and mark it as final.
Currently tests extend IndexLogManager class and override some methods of this class like getLatestLog. Improve this design so that source classes (e.g. IndexLogManager) should not depend on tests to keep their behavior.

To Reproduce

NA

Expected behavior

NA

Environment

NA

Quick Optimize support for append-only data

Describe the issue

Append-only = data is only appended with new data, and never deleted or updated
Depends on #110

After many appends and refresh calls, there will be many small index files per bucket. Implement a optimize(mode = "quick") routine which compacts large number of small index files into larger buckets, reducing the number of index files and improving perf.

Outline:

  • Identify small index files for every bucket
  • merge small files to larger files bucketwise
  • update the index metadata to use these larger files instead of small files, along with any unmodified files from the previous index snapshot.

Add utility to normalize conditions into CNF

Describe the issue

Add code to utility to normalize a condition into conjunctive normal form CNF.

Optimization rules could benefit from this code when checking and comparing arbitrary conditions.

To Reproduce

NA

Expected behavior

util code should be extended by adding code to convert an arbitrary condition into CNF.

Environment

Handle Case sensitivity in indexed and included column names appropriately

Describe the issue

Handle case-sensitivity in indexed and included column names appropriately based on the value of "spark.sql.caseSensitive".

Here's the desired behavior
Q. If case-sensitive = true, index config is wrong case, will createIndex work? No. Fail, exception.
Q. If case-sensitive = false, index config is wrong case, will createIndex work? Yes.
Q. If case-sensitive = true, an index exists with wrong case, will this index be used? No. Ignore this index
Q. If case-sensitive = false, an index exists with wrong case, will this index be used? Yes.

To Reproduce

NA

Expected behavior

Here's the desired behavior
Q. If case-sensitive = true, index config is wrong case, will createIndex work? No. Fail, exception.
Q. If case-sensitive = false, index config is wrong case, will createIndex work? Yes.
Q. If case-sensitive = true, an index exists with wrong case, will this index be used? No. Ignore this index
Q. If case-sensitive = false, an index exists with wrong case, will this index be used? Yes.

Environment

NA

Hyperspace.explain / df.explain doesn't work as expected

Describe the issue
Hypersapce.explain / df.explain doesn't pick up the filter index as expected.

To Reproduce

  import spark.implicits._
  Seq((1, "name1"), (2, "name2")).toDF("id", "name").write.mode("overwrite").parquet("table")
  val df = spark.read.parquet("table")

  val hs = new Hyperspace(spark)

  hs.createIndex(df, IndexConfig("index", indexedColumns = Seq("id"), includedColumns = Seq("name")))
  hs.indexes.show

  val query = df.filter(df("id") === 1).select("id", "name")
  hs.explain(query, verbose = true) // doesn't pick up "index"

  // Now utilize the indexes.
  spark.enableHyperspace
  query.explain // doesn't pick up "index"
  query.show // DOES pick up "index"

Looks like if the project is the same as filter output, Project seems to be removed when the rule runs, thus FilterIndexRule cannot match. show() seems to work fine.

The following also works fine: val query = df.filter(df("id") === 1).select("id")

Expected behavior
It should pick up the index in Hyperspace.explain or df.explain to avoid confusion.

Screenshots
N/A

Desktop (please complete the following information):
Tested with Apache Spark 2.4.6 / Hyperspace 0.1.0.

Additional context
N/A

Use FileContext api instead of FileSystem for atomic renames (in IndexLogManager).

Describe the issue

Operation Log in Hyperspace relies on 'atomic rename' of log files to support concurrent operations. These operations use org.apache.hadoop.fs.FileSystem.rename() api which doen't provide atomicity guarantees as strong as org.apache.hadoop.fs.FileContext.rename()

Expected behavior

Better atomicity guarantee

More Details

From org.apache.spark.sql.execution.streaming.CheckpointFileManager, which also relies on atomic renames of checkpoints (similar to atomic renames of hyperspace operation logs),

// Try to create a [checkpoint] manager based on FileContext [instead of FileSystem] because HDFS's FileContext.rename()
// gives atomic renames, which is what we rely on for the default implementation
// CheckpointFileManager.createAtomic`

Environment

NA

Enhance logging throughout the code

Describe the issue

Modify Hyperspace code to enhance logging.

To Reproduce

NA

Expected behavior

Logging behavior should be improved throughout the code base.

Environment

KryoSerializer is not compatible with the different versions of Spark

Describe the issue

KryoSerializer is not fully compatible with the different versions of Spark. It's used to serialize/deserialize the logical plan of Index to support refresh operation. This problem will be addressed more generally with #95.
For now, we should replace the ser/de implementation based on the current limitation - Covering Index on LogicalRelation/HadoopFsRelation.

To Reproduce

  1. create an index with Spark 2.4.6
  2. hs.refreshIndex() or hs.indexes.show with Spark 3.0.0
scala> hs.refreshIndex("testindex")
com.esotericsoftware.kryo.KryoException: java.lang.ArrayIndexOutOfBoundsException
Serialization trace:
catalogTable (com.microsoft.hyperspace.index.serde.package$LogicalRelationWrapper2)
  at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:144)
  at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
  at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
  at com.microsoft.hyperspace.index.serde.KryoSerDeUtils$.deserialize(KryoSerDeUtils.scala:60)
  at com.microsoft.hyperspace.index.serde.LogicalPlanSerDeUtils$.deserialize(LogicalPlanSerDeUtils.scala:71)
  at com.microsoft.hyperspace.actions.RefreshAction.df$lzycompute(RefreshAction.scala:48)
  at com.microsoft.hyperspace.actions.RefreshAction.df(RefreshAction.scala:46)
  at com.microsoft.hyperspace.actions.RefreshAction.logEntry$lzycompute(RefreshAction.scala:59)
  at com.microsoft.hyperspace.actions.RefreshAction.logEntry(RefreshAction.scala:58)
  at com.microsoft.hyperspace.actions.RefreshAction.event(RefreshAction.scala:81)
  at com.microsoft.hyperspace.actions.Action.run(Action.scala:98)
  at com.microsoft.hyperspace.actions.Action.run$(Action.scala:83)
  at com.microsoft.hyperspace.actions.RefreshAction.run(RefreshAction.scala:31)
  at com.microsoft.hyperspace.index.IndexCollectionManager.$anonfun$refresh$1(IndexCollectionManager.scala:75)
  at com.microsoft.hyperspace.index.IndexCollectionManager.$anonfun$refresh$1$adapted(IndexCollectionManager.scala:72)
  at com.microsoft.hyperspace.index.IndexCollectionManager.withLogManager(IndexCollectionManager.scala:131)
  at com.microsoft.hyperspace.index.IndexCollectionManager.refresh(IndexCollectionManager.scala:72)
  at com.microsoft.hyperspace.index.CachingIndexCollectionManager.refresh(CachingIndexCollectionManager.scala:97)
  at com.microsoft.hyperspace.Hyperspace.refreshIndex(Hyperspace.scala:77)
  ... 51 elided
Caused by: java.lang.ArrayIndexOutOfBoundsException

Expected behavior

Environment

Add lineage to covering index records

Describe the issue

For each index record, add lineage to capture the data record it is coming from. Assume data records are organized in Hive partitioned manner, where all the records which have the same value for a given partitioning key are stored in files under the same directory whose name has PartitionKey=VALUE. By adding lineage to index records, we want to add extra columns to each record and store the value of partition key(s) and file name according to the data file a record is coming from.

To Reproduce

N/A

Expected behavior

Once an index is created successfully, each index record should have columns to capture its source data record's partition key(s"' values and data file name.

Environment

All environments.

Investigate whether OR condition can use bucketing info for Join optimization.

Describe the issue
Investigate whether OR condition can use bucketing info for Join optimization.
Hyperspace currently supports conditions like select C, D from T1, T2 where T1.A = T2.A AND T1.B = T2.B where this condition in CNF form can be optimized with indexes. Investigation required to test whether select C, D from T1, T2 where T1.A = T2.A OR T1.B = T2.B can also be optimized.

To Reproduce
Not supported yet
Expected behavior
NA
Screenshots
NA
Desktop (please complete the following information):
NA
Additional context
NA

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.