Giter Club home page Giter Club logo

rubix's Introduction

RubiX

[Build Status] codecov

RubiX is a light-weight data caching framework that can be used by Big-Data engines. RubiX uses local disks to provide the best I/O bandwidth to the Big Data Engines. RubiX is useful in shared storage architectures where the data execution engine is separate from storage. For example, on public clouds like AWS or Microsoft Azure, data is stored in cloud store and the engine accesses the data over a network. Similarly in data centers Presto runs on a separate cluster from HDFS and accesses data over the network.

RubiX can be extended to support any engine that accesses data using Hadoop FileSystem interface via plugins. There are plugins to access data on AWS S3, Microsoft Azure Blob Store and HDFS. RubiX can be extended to be used with any other storage systems including other cloud stores

Check the User and Developer manual for more more information on getting started.

Supported Engines and Cloud Stores

  • Presto: Amazon S3
  • Spark: Amazon S3
  • Any engine using hadoop-2 e.g. Hive can utilize RubiX. Amazon S3 is supported

Resources

Documentation
Getting Started Guide
User Group (Google)

Talks

Talk on Rubix at Strata 2017

Blog Posts

Developers

Slack Channel

The channel is restricted to a few domains. Send an email on the user group or contact us through Github issues. We will add you to the slack channel.

rubix's People

Contributors

abhishekdas99 avatar arajagopal17 avatar dain avatar garvit-gupta avatar harmandeeps avatar hashhar avatar jamesrtaylor avatar jordanw-bq avatar kvankayala avatar losipiuk avatar martint avatar on99 avatar pvam avatar raunaqmorarka avatar rohangarg avatar rupamk avatar saumya-sunder avatar shubhamtagra avatar sopel39 avatar suryanshagnihotri avatar vrajat avatar yashaswaj avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rubix's Issues

Mask the local path where files are cached

Currently the local path is constructed by adding the remote file path to the caching directory. We need to mask the local path to make sure due to some security reasons, the remote file path cannot be easily interpreted from the local file path

Inter-node reads for non local reads

When a read is received at a node to which it is no local, it should find the owner node of the block and read it from that node. The owner node should read it from remote and cache it.

Reading from another node in same clusters seems to be faster than reading from s3. Initial thought was that with higher concurrency, s3 would turn out to be better than reads from other node but experiments show that it is not.

Experiment: Host a FileServer in node1 and access it from node2 to read different parts files.

This is for r3.large with enhanced networking:

clients| #files| request size| s3 time| fileServer time |

--------|-------------|-------------|----------|-----------------|
1 | 1 | 130MB | 3188 | 973 |
1 | 1 | 10MB | 1437 | 54 |
10 | 1 | 130MB| 17680 | 10390 |
10 | 1 | 10MB | 7537 | 219 |
10 | 5 | 130MB | 17765 | 10746 |
10 | 5 | 10MB | 7687 | 238 |

This is for r3.8xlarge with enhanced networking:

clients| #files| request size| s3 time| fileServer time |

--------|-------------|-------------|----------|-----------------|
1 | 1 | 130MB, full file | 2903 | 458 |
1 | 1 | 10MB | 1516 | 51 |
10 | 1 | 130MB, full file | 3370 | 1735 |
10 | 1 | 10MB | 1527 | 129 |
10 | 5 | 130MB each | 3464 | 1567 |
10 | 5 | 10MB | 1507 | 110 |

Reduce the impact of cache pre-warm on the jobs - Part 2

Parallelize the cache pre warm process by directly reading the data from remote filesystem if block
not found in remote node cache.
The node where the task is running will read the data remote filesystem as well as will send a download request to the remote node (where the file should be cached). The remote node downloads the file asynchronously.

Central stats collection

From master node, we should be able to extract stats from all nodes.

For this, we will need the list of all the nodes in BookKeeper

BookKeeper stats are not accurate

There are a few issues in bookKeeper stats:

  1. Counters (totalRequests, etc) need to be made atomic. We need to study the impact of changing them to atomic before rolling out the change
  2. Need to store how much did a node read in behalf of other node (via readData call)
  3. Cache Hit/Miss stats do not account for non-local reads

Cache Data only when node meant to handle it gets the request

Right now we depend on engine to schedule a Block of file on same node always for which two things are used:

  1. Location aware scheduling, which uses Block Location to schedule task
  2. Number of candidate nodes to consider allocating the work.

(1) is taken care of but for (2) we need to set it to 1. E.g. in Presto we set node-scheduler.min-candidates=1 to ensure that we only pick 1 candidate node.
But this might cause performance issues when the only candidate node is busy and hence the work cannot be scheduled. To prevent that and to remove dependency on (2), two things can be done:

a. We add logic to return multiple hosts for each file Block and set the number of these hosts as the candidate nodes number. This would cause multiple copies of data to be maintained but only a selected list of nodes will have copies for a particular Block
Or
b. We take the route of saying that only one node will save the data and if the request goes to any other node, that node will read from remote. For this we will have to make worker nodes cluster aware and the FS there should figure out if the read request should be considered local in which case it should copy data to disk else just read from remote

PrestoClusterManager wrongly infers master node as work

PrestoClusterManager has a logic that if v1/node fails then the node must be a slave. This is true in ideal case but in exceptional cases this can fail on master node too e.g. we saw SocketTimeoutException in getNodes which caused master node to be inferred as slave and it returned node list as empty causing queries to fail.

We should do atleast the following:

  1. Infer node as worker only if v1/node returns 404

  2. Add retries if v1/node fails due exceptions

Dynamic filter for white-listing locations to cache

Like "hadoop.cache.data.location.blacklist", we need to add "hadoop.cache.data.location.whitelist" and this should be able to take values which can be evaluated at runtime. E.g.
hadoop.cache.data.location.whitelist=mybucket.*/tablename/year=2016/month={currentMonth}

Better behavior in concurrent applications

Right now if two tasks aim at the same un-cached block, both get the status about it being not on disk, both download it and write to disk and both update the metadata. Only the part about metadata writing is synchronized.

It can be improved by adding another state to block status: IN_TRANSIT. Once CacheManager returns the status of block as NOT_IN_CACHE to any reader, until it hears back from that reader it will return IN_TRANSIT to all other readers asking for that block's status.
Ofcourse, some way has to be thought off to handle failures of readers which could prevent keeping the status of block as IN_TRANSIT forever.

Read - Write - Read inconsistency in Spark

When writing to a file in cache, the application which wrote the data is able to pick up the change but a different spark application running on the same cluster is not able to.
It continues to serve stale data till the application is restarted or a write takes place. In such a scenario, all the updates are present including the most recent write.

Permission denied during RemoteRead

java.util.concurrent.ExecutionException: java.io.FileNotFoundException: /media/ephemeral0/fcache/paid-qubole/default-datasets/memetracker/2008-12/000106 (Permission denied)
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:188)
at com.qubole.rubix.core.CachingInputStream.readInternal(CachingInputStream.java:246)
at com.qubole.rubix.core.CachingInputStream.read(CachingInputStream.java:180)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
at java.io.BufferedInputStream.read1(BufferedInputStream.java:275)
at java.io.BufferedInputStream.read(BufferedInputStream.java:334)
at java.io.DataInputStream.read(DataInputStream.java:100)
at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180)
at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:246)
at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:47)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:396)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:342)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Reduce the impact of cache pre-warm on the jobs - Part 1

Parallelize the cache pre warm process by directly reading the data from remote filesystem if block
not found in local cache. As part 1, we will be implementing for data not present in local node. Later we will implement for remote node

Header path incorrect when compiling Rubix-rpm

Failed to execute goal com.mycila:license-maven-plugin:2.3:check (default) on project rubix-rpm: Resource /Users/aragojopal/Downloads/Dev/Sparkbasics/rubix/rubix-rpm/src/license/header.txt not found in file system, classpath or URL: no protocol: /Users/aragojopal/Downloads/Dev/Sparkbasics/rubix/rubix-rpm/src/license/header.txt

Macro based whitelisting of locations allowed to cache

This is useful particularly in case of partitioned tables. Today, whitelisting is a regex e.g. if user wants to whitelist two tables
which appear in location reviews and bookings under same s3 prefix like s3://mybuckets/tables then the could add this to config:

hadoop.cache.data.location.whitelist=.*mybuckets/tables/(reviews|bookings).*

The problem with this is that if say bookings is partitioned by month and has data for many months while user only wants to cache the
data for last two months, user will have to keep updating this config everytime the month change. To solve that, we should provide a
macro based input to this config. E.g. if reviews are partitioned yearly and booking monthly and user wants to enable caching for
only last 5 years of reviews and last 2 months of bookings, this should be possible:

hadoop.cache.data.location.whitelist=.*mybuckets/tables/(reviews/year=$lastFiveYears$|bookings/month=$lastTwoMonthsNames$).*

Rubix should evaluate the macros $lastFiveYears$ and $lastTwoMonthsNames$ at runtime and come up with the whitelisting config as:

hadoop.cache.data.location.whitelist=.*mybuckets/tables/(reviews/year=(2016|2015|2014|2013|2012)|bookings/month=(October|September)).*

Rubix should provide some of the common functions out of the box and the system should be extendable for user defined macros.
E.g. if a particular user has data partitioned by store location as s3://mybucket/tables/stores/location=xyz and wants to only
cache data for stores in Bangalore and Pune, he should be able to write a custom function to do it, add that jar and use it in
whitelist as:

hadoop.cache.data.location.whitelist=.*mybuckets/tables/stores/location=$com.myCompany.rubix.myCustomStoreSelector$.*

Improve internal Cache data structure

We need a Cache DS which can satisfy following properties:

  1. Cache max weight is set to the "hadoop.cache.data.fullness.percentage"
  2. Each entry has weight equal to the size occupied by its data file on disk
  3. Each cache is reaching max weight, it starts eviction and we delete the data file for the evicted entries.

Right now we use guava cache to store the FileMetadata which does not provide dynamic weighing mechanism for the entries, hence (2) cannot be achieved.

To workaround this, this is what we do today
Each entry made to this cache is weighed by following logic:
If this file is not present on disk, keep the weight of the entry as size of the file
If this is present on disk, then use the weight of the entry as the occupied size of file
And we evict aggressively, any entry older than 5min is evicted but if it evicted due to time expiry then the data is not deleted. Only when the eviction is due to size, we delete the data file. So, if the eviction is due to time expiry and later new request adds same file's metadata entry we now enter the entry with the
weight of actual size occupied on disk.

Double Caching

Given that we cannot achieve 100% locality based scheduling in hadoop2, we should extend getFileBlockLocations to return multiple locations for each block. By default we should return 2 locations and let it be configurable.

Also, cache warmup needs to be updated to handle this case. Warmup should now bring in data into all locations and not just the one where the read is received

Inconsistent state in cache - Not reading data from cache

Imagine file1 in the local disk already partially(or completely cached)

ls -lrt File1*
-rw-rw-rw- 1 yarn yarn 13631488 Feb 8 20:03 File1
-rw-r--r-- 1 yarn yarn 12 Feb 8 20:03 File1_mdfile

2018-02-08T20:04:30.046Z        DEBUG   20180208_200429_00310_rztrq.11.2-36-93  com.qubole.rubix.core.CachingInputStream         FileSize of remotePath : File1 is : 94607314
2018-02-08T20:04:30.087Z        DEBUG   20180208_200429_00310_rztrq.11.2-36-93  com.qubole.rubix.core.CachingInputStream         FileSize of remotePath : s3a://blah-bhal-path/File1.snappy.parquet is : 94607314
2018-02-08T20:04:30.091Z        DEBUG   20180208_200429_00310_rztrq.11.2-36-93  com.qubole.rubix.core.CachingInputStream        Seek request, currentPos: 0 currentBlock: 0
2018-02-08T20:04:30.091Z        DEBUG   20180208_200429_00310_rztrq.11.2-36-93  com.qubole.rubix.core.CachingInputStream        Seek to 94607306, setting block location 90
2018-02-08T20:04:30.091Z        DEBUG   20180208_200429_00310_rztrq.11.2-36-93  com.qubole.rubix.core.CachingInputStream        Got Read, currentPos: 94607306 currentBlock: 90 bufferOffset: 0 length: 1048576
2018-02-08T20:04:30.091Z        DEBUG   20180208_200429_00310_rztrq.11.2-36-93  com.qubole.rubix.core.CachingInputStream         Iterating from startBlock : 90 to End block : 92
2018-02-08T20:04:30.091Z        DEBUG   20180208_200429_00310_rztrq.11.2-36-93  com.qubole.rubix.core.CachingInputStream        Sending cached block 90 to cachedReadRequestChain
2018-02-08T20:04:30.091Z        DEBUG   20180208_200429_00310_rztrq.11.2-36-93  com.qubole.rubix.core.ReadRequestChain  Request to add ReadRequest: [94371840, 94607314, 94607306, 94607314, 0]
2018-02-08T20:04:30.091Z        DEBUG   20180208_200429_00310_rztrq.11.2-36-93  com.qubole.rubix.core.ReadRequestChain  Added ReadRequest: [94371840, 94607314, 94607306, 94607314, 0]
2018-02-08T20:04:30.092Z        DEBUG   20180208_200429_00310_rztrq.11.2-36-93  com.qubole.rubix.core.CachingInputStream        Reached EOF, returning
2018-02-08T20:04:30.092Z        DEBUG   20180208_200429_00310_rztrq.11.2-36-93  com.qubole.rubix.core.CachingInputStream        Executing Chains
2018-02-08T20:04:30.092Z        DEBUG   20180208_200429_00310_rztrq.11.2-36-93  com.qubole.rubix.core.CachedReadRequestChain    Processing readrequest 94607306-94607314, length 8
2018-02-08T20:04:30.092Z        DEBUG   20180208_200429_00310_rztrq.11.2-36-93  com.qubole.rubix.core.CachedReadRequestChain    CachedFileRead copied data [94607306 - 94607306] at buffer offset 0
2018-02-08T20:04:30.092Z        INFO    20180208_200429_00310_rztrq.11.2-36-93  com.qubole.rubix.core.CachedReadRequestChain    Read 0 bytes from cached file
2018-02-08T20:04:30.092Z        DEBUG   20180208_200429_00310_rztrq.11.2-36-93  com.qubole.rubix.core.CachingInputStream        Read 0 bytes
2018-02-08T20:04:30.092Z        DEBUG   20180208_200429_00310_rztrq.11.2-36-93  com.qubole.rubix.core.CachingInputStream        Got Read, currentPos: 94607306 currentBlock: 90 bufferOffset: 0 length: 1048576
2018-02-08T20:04:30.092Z        DEBUG   20180208_200429_00310_rztrq.11.2-36-93  com.qubole.rubix.core.CachingInputStream         Iterating from startBlock : 90 to End block : 92
2018-02-08T20:04:30.093Z        DEBUG   20180208_200429_00310_rztrq.11.2-36-93  com.qubole.rubix.core.CachingInputStream        Sending cached block 90 to cachedReadRequestChain
2018-02-08T20:04:30.093Z        DEBUG   20180208_200429_00310_rztrq.11.2-36-93  com.qubole.rubix.core.ReadRequestChain  Request to add ReadRequest: [94371840, 94607314, 94607306, 94607314, 0]
2018-02-08T20:04:30.093Z        DEBUG   20180208_200429_00310_rztrq.11.2-36-93  com.qubole.rubix.core.ReadRequestChain  Added ReadRequest: [94371840, 94607314, 94607306, 94607314, 0]
2018-02-08T20:04:30.093Z        DEBUG   20180208_200429_00310_rztrq.11.2-36-93  com.qubole.rubix.core.CachingInputStream        Reached EOF, returning
2018-02-08T20:04:30.093Z        DEBUG   20180208_200429_00310_rztrq.11.2-36-93  com.qubole.rubix.core.CachingInputStream        Executing Chains
2018-02-08T20:04:30.093Z        DEBUG   20180208_200429_00310_rztrq.11.2-36-93  com.qubole.rubix.core.CachedReadRequestChain    Processing readrequest 94607306-94607314, length 8
2018-02-08T20:04:30.093Z        DEBUG   20180208_200429_00310_rztrq.11.2-36-93  com.qubole.rubix.core.CachedReadRequestChain    CachedFileRead copied data [94607306 - 94607306] at buffer offset 0
2018-02-08T20:04:30.093Z        INFO    20180208_200429_00310_rztrq.11.2-36-93  com.qubole.rubix.core.CachedReadRequestChain    Read 0 bytes from cached file
2018-02-08T20:04:30.094Z        DEBUG   20180208_200429_00310_rztrq.11.2-36-93  com.qubole.rubix.core.CachingInputStream        Read 0 bytes
2018-02-08T20:04:30.094Z        DEBUG   20180208_200429_00310_rztrq.11.2-36-93  com.qubole.rubix.core.CachingInputStream        Got Read, currentPos: 94607306 currentBlock: 90 bufferOffset: 0 length: 1048576
2018-02-08T20:04:30.094Z        DEBUG   20180208_200429_00310_rztrq.11.2-36-93  com.qubole.rubix.core.CachingInputStream         Iterating from startBlock : 90 to End block : 92
2018-02-08T20:04:30.094Z        DEBUG   20180208_200429_00310_rztrq.11.2-36-93  com.qubole.rubix.core.CachingInputStream        Sending cached block 90 to cachedReadRequestChain
2018-02-08T20:04:30.094Z        DEBUG   20180208_200429_00310_rztrq.11.2-36-93  com.qubole.rubix.core.ReadRequestChain  Request to add ReadRequest: [94371840, 94607314, 94607306, 94607314, 0]
2018-02-08T20:04:30.094Z        DEBUG   20180208_200429_00310_rztrq.11.2-36-93  com.qubole.rubix.core.ReadRequestChain  Added ReadRequest: [94371840, 94607314, 94607306, 94607314, 0]
2018-02-08T20:04:30.094Z        DEBUG   20180208_200429_00310_rztrq.11.2-36-93  com.qubole.rubix.core.CachingInputStream        Reached EOF, returning
2018-02-08T20:04:30.094Z        DEBUG   20180208_200429_00310_rztrq.11.2-36-93  com.qubole.rubix.core.CachingInputStream        Executing Chains
2018-02-08T20:04:30.095Z        DEBUG   20180208_200429_00310_rztrq.11.2-36-93  com.qubole.rubix.core.CachedReadRequestChain    Processing readrequest 94607306-94607314, length 8
2018-02-08T20:04:30.095Z        DEBUG   20180208_200429_00310_rztrq.11.2-36-93  com.qubole.rubix.core.CachedReadRequestChain    CachedFileRead copied data [94607306 - 94607306] at buffer offset 0
2018-02-08T20:04:30.095Z        INFO    20180208_200429_00310_rztrq.11.2-36-93  com.qubole.rubix.core.CachedReadRequestChain    Read 0 bytes from cached file
2018-02-08T20:04:30.095Z        DEBUG   20180208_200429_00310_rztrq.11.2-36-93  com.qubole.rubix.core.CachingInputStream        Read 0 bytes
2018-02-08T20:04:30.095Z        DEBUG   20180208_200429_00310_rztrq.11.2-36-93  com.qubole.rubix.core.CachingInputStream        Got Read, currentPos: 94607306 currentBlock: 90 bufferOffset: 0 length: 1048576
2018-02-08T20:04:30.095Z        DEBUG   20180208_200429_00310_rztrq.11.2-36-93  com.qubole.rubix.core.CachingInputStream         Iterating from startBlock : 90 to End block : 92
2018-02-08T20:04:30.095Z        DEBUG   20180208_200429_00310_rztrq.11.2-36-93  com.qubole.rubix.core.CachingInputStream        Sending cached block 90 to cachedReadRequestChain
2018-02-08T20:04:30.095Z        DEBUG   20180208_200429_00310_rztrq.11.2-36-93  com.qubole.rubix.core.ReadRequestChain  Request to add ReadRequest: [94371840, 94607314, 94607306, 94607314, 0]
2018-02-08T20:04:30.096Z        DEBUG   20180208_200429_00310_rztrq.11.2-36-93  com.qubole.rubix.core.ReadRequestChain  Added ReadRequest: [94371840, 94607314, 94607306, 94607314, 0]
2018-02-08T20:04:30.096Z        DEBUG   20180208_200429_00310_rztrq.11.2-36-93  com.qubole.rubix.core.CachingInputStream        Reached EOF, returning
2018-02-08T20:04:30.096Z        DEBUG   20180208_200429_00310_rztrq.11.2-36-93  com.qubole.rubix.core.CachingInputStream        Executing Chains
2018-02-08T20:04:30.096Z        DEBUG   20180208_200429_00310_rztrq.11.2-36-93  com.qubole.rubix.core.CachedReadRequestChain    Processing readrequest 94607306-94607314, length 8
2018-02-08T20:04:30.096Z        DEBUG   20180208_200429_00310_rztrq.11.2-36-93  com.qubole.rubix.core.CachedReadRequestChain    CachedFileRead copied data [94607306 - 94607306] at buffer offset 0
2018-02-08T20:04:30.096Z        INFO    20180208_200429_00310_rztrq.11.2-36-93  com.qubole.rubix.core.CachedReadRequestChain    Read 0 bytes from cached file
2018-02-08T20:04:30.096Z        DEBUG   20180208_200429_00310_rztrq.11.2-36-93  com.qubole.rubix.core.CachingInputStream        Read 0 bytes

As we can bookkeeper assumes the data is cached (from 94607306 to 94607314) tries to read from the cache but there is nothing in the file. So it will fill nothing in the input buffer.

In the bookeeper log we are getting this:

18/02/08 22:23:44,854 WARN pool-3-thread-10537 bookkeeper.BookKeeper: Could not cache data: org.apache.thrift.shaded.TException: java.io.IOException: No such file or directory
        at com.qubole.rubix.bookkeeper.BookKeeper.getCacheStatus(BookKeeper.java:154)
        at com.qubole.rubix.bookkeeper.BookKeeper.readData(BookKeeper.java:275)
        at com.qubole.rubix.spi.BookKeeperService$Processor$readData.getResult(BookKeeperService.java:456)
        at com.qubole.rubix.spi.BookKeeperService$Processor$readData.getResult(BookKeeperService.java:441)
        at org.apache.thrift.shaded.ProcessFunction.process(ProcessFunction.java:39)
        at org.apache.thrift.shaded.TBaseProcessor.process(TBaseProcessor.java:39)
        at org.apache.thrift.shaded.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: No such file or directory
        at java.io.UnixFileSystem.createFileExclusively(Native Method)
        at java.io.File.createNewFile(File.java:1006)
        at com.qubole.rubix.bookkeeper.FileMetadata.refreshBitmap(FileMetadata.java:93)
        at com.qubole.rubix.bookkeeper.FileMetadata.isBlockCached(FileMetadata.java:112)
        at com.qubole.rubix.bookkeeper.BookKeeper.getCacheStatus(BookKeeper.java:138)
        ... 9 more

Presto error - no nodes available to run query

With RUBIX enabled, running multiple queries on presto cluster causes 'No nodes available to run query' error.

cluster configs

m3.xlarge
min=1
rubix enabled
presto version =0.119, 0.142
Query -

SELECT * FROM tpcds_orc_100.store_sales where ss_customer_sk=1000

Ran 10 instances of the above query. Some were successful others weren't.

Query 20161118_124345_00020_98npx failed: No nodes available to run query
com.facebook.presto.spi.PrestoException: No nodes available to run query
at com.facebook.presto.util.Failures.checkCondition(Failures.java:75)
at com.facebook.presto.execution.NodeScheduler$NodeSelector.computeAssignments(NodeScheduler.java:278)
at com.facebook.presto.execution.scheduler.SplitPlacementPolicy.computeAssignments(SplitPlacementPolicy.java:41)
at com.facebook.presto.execution.scheduler.SourcePartitionedScheduler.schedule(SourcePartitionedScheduler.java:99)
at com.facebook.presto.execution.scheduler.SqlQueryScheduler.schedule(SqlQueryScheduler.java:295)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Internally, presto server is crashing because of java is running out of memory. More specifically, RemoteReadRequestChain of the Cache subsystem allocates 2 buffers namely prefix and suffix buffer to hold the block data which is outside the requested block range. This will later be written to disk along with requested data to serve future I/O requests. In a concurrent environment, there can be thousands of these ReadRequests in RemoteReadRequestChain which results in allocating many small ByteBuffers (each of block size). Its is possible that before Garbage Collector kicks in and frees up this space, JVM is running out of memory and raising a SIGINT signal. JVM eventually shuts down bringing down presto server.

No caching for non local reads

For engines like hadoop, it appears to be impossible to achieve 100% locality based scheduling. For such cases, to start with, we should not cache data to disk if the read is not local to a node.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.