Giter Club home page Giter Club logo

storehaus's Introduction

Storehaus

Build Status Codecov branch Latest version Chat

Storehaus is a library that makes it easy to work with asynchronous key value stores. Storehaus is built on top of Twitter's Future.

Storehaus-Core

Storehaus's core module defines three traits; a read-only ReadableStore a write-only WritableStore and a read-write Store. The traits themselves are tiny:

package com.twitter.storehaus

import com.twitter.util.{ Closable, Future, Time }

trait ReadableStore[-K, +V] extends Closeable {
  def get(k: K): Future[Option[V]]
  def multiGet[K1 <: K](ks: Set[K1]): Map[K1, Future[Option[V]]]
  override def close(time: Time) = Future.Unit
}

trait WritableStore[-K, -V] {
  def put(kv: (K, V)): Future[Unit] = multiPut(Map(kv)).apply(kv._1)
  def multiPut[K1 <: K](kvs: Map[K1, V]): Map[K1, Future[Unit]] =
    kvs.map { kv => (kv._1, put(kv)) }
  override def close(time: Time) = Future.Unit
}

trait Store[-K, V] extends ReadableStore[K, V] with WritableStore[K, Option[V]]

The ReadableStore trait uses the Future[Option[V]] return type to communicate one of three states about each value. A value is either

  • definitely present,
  • definitely missing, or
  • unknown due to some error (perhaps a timeout, or a downed host).

The ReadableStore and Store companion objects provide a bunch of ways to create new stores. See the linked API documentation for more information.

Combinators

Coding with Storehaus's interfaces gives you access to a number of powerful combinators. The easiest way to access these combinators is by wrapping your store in an EnrichedReadableStore or an EnrichedStore. Storehaus provides implicit conversions inside of the ReadableStore and Store objects.

Here's an example of the mapValues combinator, useful for transforming the type of an existing store.

import com.twitter.storehaus.ReadableStore
import ReadableStore.enrich

// Create a ReadableStore from Int -> String:
val store = ReadableStore.fromMap(Map[Int, String](1 -> "some value", 2 -> "other value"))

// "get" behaves as expected:
store.get(1).get
// res5: Option[String] = Some(some value)

// calling "mapValues" with a function from V => NewV returns a new ReadableStore[K, NewV]:
val countStore: ReadableStore[Int, Int] = store.mapValues { s => s.size }

// This new store applies the function to every value on the way out:
countStore.get(1).get
// res6: Option[Int] = Some(10)

Storehaus-Algebra

storehaus-algebra module adds the MergeableStore trait. If you're using key-value stores for aggregations, you're going to love MergeableStore.

package com.twitter.storehaus.algebra

trait MergeableStore[-K, V] extends Store[K, V] {
  def monoid: Monoid[V]
  def merge(kv: (K, V)): Future[Option[V]] = multiMerge(Map(kv)).apply(kv._1)
  def multiMerge[K1 <: K](kvs: Map[K1, V]): Map[K1, Future[Option[V]]] = kvs.map { kv => (kv._1, merge(kv)) }
}

MergeableStore's merge and multiMerge are similar to put and multiPut; the difference is that values added with merge are added to the store's existing value and the previous value is returned. Because the addition is handled with a Semigroup[V] or Monoid[V] from Twitter's Algebird project, it's easy to write stores that aggregate Lists, decayed values, even HyperLogLog instances.

The MergeableStore object provides a number of combinators on these stores. For ease of use, Storehaus provides an implicit conversion to an enrichment on MergeableStore. Access this by importing MergeableStore.enrich.

Other Modules

Storehaus provides a number of modules wrapping existing key-value stores. Enriching these key-value stores with Storehaus's combinators has been hugely helpful to us here at Twitter. Writing your jobs in terms of Storehaus stores makes it easy to test your jobs; use an in-memory JMapStore in testing and a MemcacheStore in production.

Planned Modules

Here's a list of modules we plan in implementing, with links to the github issues tracking progress on these modules:

Documentation

To learn more and find links to tutorials and information around the web, check out the Storehaus Wiki.

The latest ScalaDocs are hosted on Storehaus's Github Project Page.

Contact

Discussion occurs primarily on the Storehaus mailing list. Issues should be reported on the GitHub issue tracker.

Get Involved + Code of Conduct

Pull requests and bug reports are always welcome!

We use a lightweight form of project governence inspired by the one used by Apache projects. Please see Contributing and Committership for our code of conduct and our pull request review process. The TL;DR is send us a pull request, iterate on the feedback + discussion, and get a +1 from a Committer in order to get your PR accepted.

The current list of active committers (who can +1 a pull request) can be found here: Committers

A list of contributors to the project can be found here: Contributors

Maven

Storehaus modules are available on maven central. The current groupid and version for all modules is, respectively, "com.twitter" and 0.13.0.

Current published artifacts are

  • storehaus-core_2.11
  • storehaus-core_2.10
  • storehaus-algebra_2.11
  • storehaus-algebra_2.10
  • storehaus-memcache_2.11
  • storehaus-memcache_2.10
  • storehaus-mysql_2.11
  • storehaus-mysql_2.10
  • storehaus-hbase_2.11
  • storehaus-hbase_2.10
  • storehaus-redis_2.11
  • storehaus-redis_2.10
  • storehaus-dynamodb_2.11
  • storehaus-dynamodb_2.10
  • storehaus-kafka-08_2.11
  • storehaus-kafka-08_2.10
  • storehaus-mongodb_2.11
  • storehaus-mongodb_2.10
  • storehaus-elasticsearch_2.11
  • storehaus-elasticsearch_2.10
  • storehaus-leveldb_2.11
  • storehaus-leveldb_2.10
  • storehaus-http_2.11
  • storehaus-http_2.10
  • storehaus-cache_2.11
  • storehaus-cache_2.10
  • storehaus-testing_2.11
  • storehaus-testing_2.10

The suffix denotes the scala version.

Testing notes

We use travis-ci to set up any underlying stores (e.g. MySQL, Redis, Memcached) for the tests. In order for these tests to pass on your local machine, you may need additional setup.

MySQL tests

You will need MySQL installed on your local machine. Once installed, run the mysql commands listed in .travis.yml file.

Redis tests

You will need redis installed on your local machine. Redis comes bundled with an executable for spinning up a server called redis-server. The Storehaus redis tests expect the factory defaults for connecting to one of these redis server instances, resolvable on localhost port 6379.

Memcached

You will need Memcached installed on your local machine and running on the default port 11211.

Authors

Contributors

Here are a few that shine among the many:

License

Copyright 2013 Twitter, Inc.

Licensed under the Apache License, Version 2.0.

storehaus's People

Contributors

alanbato avatar benfradet avatar caniszczyk avatar dieu avatar dvryaboy avatar franklin-stripe avatar gitter-badger avatar ianoc avatar ielashi avatar isnotinvain avatar jcoveney avatar johnynek avatar kevinoliver avatar koertkuipers avatar lanxx019 avatar luciferous avatar mansurashraf avatar nownikhil avatar oscar-stripe avatar pankajroark avatar ppanero avatar rubanm avatar rweald avatar ryanlecompte avatar singhala avatar softprops avatar solar avatar sritchie avatar sritchie-stripe avatar ximyu avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

storehaus's Issues

Sharding (Currying)

With a Bijection[K,(K1,K2)] then you can make a Bijection[(K1) => ReadableStore[K2,V], ReadableStore[K,V]] and similarly with a Store.

We often call this sharding (example: K is an Integer, K1 = K % N, K2 = K / N).

VersionedStore proposal

The notion of a VersionedStore would make it easy to create a distributed read-only database backed by storehaus stores that loaded themselves off of disk.

import com.twitter.util.{ Duration, Future, Timer }

/**
  * ReadableStore with some notion of versioning.
  */
trait VersionedStore[-K, +V] extends ReadableStore[K, (Long, V)] {
  def currentVersion: Option[Long]
}

/**
  * Fired when an updating versioned store is not yet loaded.
  */
class StoreNotLoadedException extends RuntimeException

/**
  * The typical not-yet-loaded versioned store. This store returns
  * exceptions for every get and multiGet.
  */
object ExceptionalStore extends VersionedStore[Any, Nothing] {
  override def get(k: Any) = Future.exception(new StoreNotLoadedException)
  override val currentVersion = None
}

class ConstantVersionedStore[-K, +V](store: ReadableStore[K, V], version: Long) extends VersionedStore[K, V] {
  override val currentVersion = Some(version)

  protected def prependVersion(f: Future[Option[V]], version: Long) =
    f.map { _.map { (version, _) } }

  override def get(k: K) = prependVersion(store.get(k), version)
  override def multiGet[K1 <: K](ks: Set[K1]): Map[K1, Future[Option[(Long, V)]]] =
    store.multiGet(ks).mapValues { prependVersion(_, version) }
}

/**
  * Versioned store implementation capable of updating itself to some
  * new version, potentially asynchronously. One use case here would a
  * store that loads its backing dataset across the network from
  * HDFS. Every time a new version appeared on HDFS the watcher
  * process would switch the backing store behind the scenes.
  */
class UpdatingVersionedStore[-K, +V](updateInterval: Duration)(watcherProcess: Option[Long] => Future[Option[VersionedStore[K, V]]])
  (implicit timer: Timer)
    extends VersionedStore[K, V] {
  protected var innerStore: Future[VersionedStore[K, V]] = Future.value(ExceptionalStore)

  timer.schedule(updateInterval) {
    innerStore = innerStore
      .join(watcherProcess(currentVersion))
      .map { case (oldStore, optNewStore) => optNewStore.getOrElse(oldStore) }
  }
  override def get(k: K) = innerStore.flatMap { _.get(k) }
  override def multiGet(ks: Set[K]) =
    FutureOps.liftFutureValues(ks, innerStore.map { _.multiGet(ks) })
}

Batching transformer

Suppose your client is going to make a bunch of gets, but you want to make sure those are batched into blocks of at least N or after a period of at most T, and send it to multiGet.

This makes it easy for callers to continue to use the get API, but to be more efficient and block them up internally with a multiGet.

We can do the same thing on write/merge (in fact, we have some of this for merging now).

awkward placement of ConvertedStore

The convenience if using an Injection to convert ConvertedStores values has landed it in a somewhat awkward home, the algebra module. I get that we want to keep storehaus-core lean but there's some residual strangeness in that I can enrich my readable store with convert but my enriched stores have no such luck because the pairing of converting for stores is under the algebra source tree. As a user, I'd expect that when I can call convert on my extension of Store, I'd get back a Store. Instead I get back a ReadableStore.

I can take a stab at making this less awkward of anyone can point me in the right direction.

Add more "select" choices

The store returned by the "first" implementation on the ReadableStore and Store objects currently returns the first non-exceptional value. Some other nice combinators would be:

  • Return the fastest value, exceptional or non-exceptional (probably useless)
  • Return the first non-None value.

storehaus-storm

I see DRPCStore living here, along with the DecoderBolt and the relevant piece of our SinkBolt.

DRPCStore, for reference (needs BatchID removed, of course)

https://gist.github.com/sritchie/4951204

package com.twitter.summingbird.storm.store

import backtype.storm.utils.DRPCClient
import com.twitter.bijection.Bijection
import com.twitter.util.{ Future, FuturePool }
import com.twitter.storehaus.ReadableStore
import com.twitter.summingbird.util.RpcBijection
import java.util.concurrent.Executors

import Bijection.asMethod // enable "as" syntax

/**
 * Wrapper over backtype.storm.utils.DRPCClient.
 * This ReadableStore allows the user to perform online read-only
 * queries through Storm's DRPC mechanism.
 *
 *  @author Oscar Boykin
 *  @author Sam Ritchie
 */

object DRPCStore {
  def apply[Key, Value](nimbusHost: String, appID: String, port: Int = 3772)
  (implicit keyCodec: Bijection[Key, Array[Byte]], valCodec: Bijection[Value, Array[Byte]]) =
    new DRPCStore[Key,Value](nimbusHost, appID, port)
}

class DRPCStore[Key, Value](nimbusHost: String, appID: String, port: Int)
(implicit kBijection: Bijection[Key, Array[Byte]], vBijection: Bijection[Value, Array[Byte]])
extends ReadableStore[(Key, BatchID), Value] {
  val futurePool = FuturePool(Executors.newFixedThreadPool(4))

  val drpcClient = new DRPCClient(nimbusHost, port)

  implicit val pair2String: Bijection[(Key, BatchID), String] = RpcBijection.batchPair[Key]
  implicit val val2String: Bijection[Option[Value], String] = RpcBijection.option[Value]

  override def get(pair: (Key, BatchID)): Future[Option[Value]] =
    futurePool { drpcClient.execute(appID, pair.as[String]) }
      .map { _.as[Option[Value]] }
}

StorehausOutputFormat

I'm thinking of a Hadoop output format for generating many Storehaus persistences.

The output format would:

  • accept a number of shards and a shard function,
  • assign a shard to each key,
  • sort by (shard, key)
  • creates N Stores on disk.

Paired with #47, and the VersionedTap in dfs-datastores, each output Store would be a proper VersionedStore.

IterableStore

If a store could produce an iterator over its pairs it'd be possible to write a StorehausInputFormat and consume storehaus data from scalding.

trait IterableStore[-K, V] extends ReadableStore[K, V] with Iterable[(K, V)]

Write-Only Store Abstraction

It may be convenient to have a built-in or explicit abstraction for stores where the read and write paths are distinct. Currently you can have a readable store but the next level of abstractions are Stores which readable and writable. It may be convenient to have a wrapping store that delegates all reads to a read-only store and all writes to a write-only store. I'm not sure what you would call this but it could just be a wrapper for both sides which delegate on puts/gets

// a better name maybe???
class PartionedStore(readStore: Store[...], writeStore: Store[...]) { 
  override def get(k: K) = readStore.get(k)
  override def put(kv: (k: K, v: Option[V]) = writeStore.put(kv)
  // yada yada
}

Related, it may be convenient to define an explicit write/append only store for things like Stats and logs.

If this is not worth while, close this issue out.

For network service backed stores that wish to do this, they can always define interfaces for supplying two underlying clients for store operations: one configured for reading and one configured for writing.

FaultTolerantStores

If we have K stores, we can return a value when K/2 + 1 have given the same result.

We can do the same for writable stores, except parallel writes, and then on read repair any missing/incorrect data.

Proposal for trait design

Just a sketch:

ReadableStore seems well posed.

I have the idea of: SettableStore and UpdatableStore. This don't extend ReadableStore. Clearly, UpdatableStore extends SettableStore (just update and ignore the input). The SettableStore + ReadableStore can implement an UpdatableStore.

Lastly, I think we need to separate mutable and immutable. For instance, an LRU is a mutable ReadableStore (reading mutates it).

We can make a concurrent mutable version from the immutable one (just keep an internal var that we atomically update).

What do you think, Sam?

Add MissingValueException

We could use a helper function like this to pad out KV store return values with Future.exception to conform to our interface:

val missing: Map[K, Future[Option[V]]] =
  (keys -- scalaM.keySet).map { k => (k, Future.exception(new MissingValueException("Missing value for key: " + k))) }.toMap

LoggingStore

A LoggingStore is a ReadableStore that triggers a side-effect into a Mergeable on every call to get or multiGet:

class LoggingStore[-K, +V](store: ReadableStore[K, V], mergeable: Mergeable[K, Future[Option[V]]]) extends ReadableStore[K, V] {
  override def get(k: K) = {
    val ret = store.get(k)
    mergeable.merge(k -> ret)
    ret
  }
  override def multiGet[K1 <: K](ks: Set[K1]) = {
    val ret = store.multiGet(ks)
    mergeable.multiMerge(ret)
    ret
  }
  override def close { store.close }
}

MemcacheStore Properties

pending some merges from the redis branch for resource cleanup and memcache mergables for new I'd like to put in some unit tests for the memcache module.

MySqlValue typeclass

We need to encapsulate the text protocol of mysql into a MySqlValue typeclass which, I guess, is a special kind of Injection to String or perhaps Injection to ChannelBuffer (or both).

Then we could make the MySqlStore[K:MySqlValue, V: MySqlValue]

retrying in storehaus should be a little more configurable

The recent addition of retry-able stores was nice but I think the way a client wants to retry should be a little more configurable.

Some interfaces where retry apis shine are

I talked to @n8han, the author of dispatch, about extracting his interface which seems to be the most generic and he said it'd be cool. I hate to see this kind of thing be reinvented in multiple contexts so I created a standalone retry library based on the one in reboot.

The problems I face now are

  1. the retry lib is based on scala's future interface, only available in 2.9.3 and 2.10+. Storehaus is published for 2.9.2 and 2.10 which means if I wanted to adapt this interface to storehaus I could only do it in modules published for 2.10.

  2. The retry lib works with scala futures while storehaus is based on twttr futures. Scala's interface looks like it stole a lot from twitter so a bijection between the two should hopefully be the fix but I'd had to figure out a good way of handling the 2.9.2 case.

  3. The retry lib needs to emulate sleeping asynchronously to simulate a pause between retries. Dispatch reboot uses netty's hashwheel timer to put a block of code on a timer. Twitter defines its own Timer interface. ( The zookeeper lib uses this ). In order to integrate a generic retry lib, a common retry interface would be required. This should not be hard. Potentially something like this should suffice.

  4. Is storehaus open to non-twitter published dependencies? Since the retry library is outside the twitter lib lifecycle it could go in its own module storehaus. I publish everything to maven central so resolving should not be an issue.

Time-based store buffers

BufferingStore might accept a Timer as well and schedule a flush on some regular interval. This would require synchronizing access to the promise map and the wrapped store.

ReadableStore.find

This should search readable stores in order for either

The first non-exceptional return value, or
The first non-exceptional, non-None return value

Proposal: changing the store interface

Here's an idea:

trait ReadableStore[K,V] {
  def get(k: K): Future[Option[V]]
  def multiGet(ks: Set[K]): Future[Map[K, Future[Option[V]]]]
}

I tried to think of a better approach, and I this seems like the minimal change that can support combining stores in a way that doesn't result in all-or-nothing semantics.

Merged Stores

Given a Seq[ReadableStore[K,V]], you can do a read and return the first non-None, non-exceptional value you get on get.

For multiGet, you can do the same for all the keys (keep waiting until you have all the results, or have exhausted the list.

This is similar to ReadableStore.select, but not quite the same.

Similarly, we send writes to all stores in a Seq[Store[_,K,V]].

We can model both of these as ReadableStore/Store and continue to combine with other combinators.

consider managing release notes with herald

A colleague and myself curate a tumblr account for announcing release notes for scala projects using herald as the primary client. The recipe is simple, you drop a file named {version}.markdown in a notes directory under your source tree. Just run herald in your the directory containing the notes dir and herald will figure out the latest release and generate a preview and post to notes. This has been a great way for me to keep trace of what changes in my projects and document it for my users in a common location. Would this be something storehaus would be interested in doing? If you guys can pass along an email associated with a tumblr account. I can get you up and running.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.