Giter Club home page Giter Club logo

spark-hbase-connector's Introduction

Spark-HBase Connector

Build status

This library lets your Apache Spark application interact with Apache HBase using a simple and elegant API.

If you want to read and write data to HBase, you don't need using the Hadoop API anymore, you can just use Spark.

Including the library

The spark-hbase-connector is available in Sonatype repository. You can just add the following dependency in sbt:

libraryDependencies += "it.nerdammer.bigdata" % "spark-hbase-connector_2.10" % "1.0.2"

The Maven style version of the dependency is:

<dependency>
  <groupId>it.nerdammer.bigdata</groupId>
  <artifactId>spark-hbase-connector_2.10</artifactId>
  <version>1.0.2</version>
</dependency>

If you don't like sbt or Maven, you can also check out this Github repo and execute the following command from the root folder:

sbt package

SBT will create the library jar under target/scala-2.10.

Note that the library depends on the following artifacts:

libraryDependencies +=  "org.apache.spark" % "spark-core_2.10" % "1.6.0" % "provided"

libraryDependencies +=  "org.apache.hbase" % "hbase-common" % "1.0.3" excludeAll(ExclusionRule(organization = "javax.servlet", name="javax.servlet-api"), ExclusionRule(organization = "org.mortbay.jetty", name="jetty"), ExclusionRule(organization = "org.mortbay.jetty", name="servlet-api-2.5"))

libraryDependencies +=  "org.apache.hbase" % "hbase-client" % "1.0.3" excludeAll(ExclusionRule(organization = "javax.servlet", name="javax.servlet-api"), ExclusionRule(organization = "org.mortbay.jetty", name="jetty"), ExclusionRule(organization = "org.mortbay.jetty", name="servlet-api-2.5"))

libraryDependencies +=  "org.apache.hbase" % "hbase-server" % "1.0.3" excludeAll(ExclusionRule(organization = "javax.servlet", name="javax.servlet-api"), ExclusionRule(organization = "org.mortbay.jetty", name="jetty"), ExclusionRule(organization = "org.mortbay.jetty", name="servlet-api-2.5"))


libraryDependencies +=  "org.scalatest" % "scalatest_2.10" % "2.2.4" % "test"

Check also if the current branch is passing all tests in Travis-CI before checking out (See "build" icon above).

Basic Hostname Configuration

If you need to contact an host different from localhost you need to set it in the spark.hbase.host system property. You can do it using the JVM settings (-Dspark.hbase.host=the_zookeeper_quorum_host) or using the --conf setting if you are using the Spark shell.

Writing to HBase (Basic)

Writing to HBase is very easy. Remember to import the implicit conversions:

import it.nerdammer.spark.hbase._

You have just to create a sample RDD, as the following one:

val rdd = sc.parallelize(1 to 100)
            .map(i => (i.toString, i+1, "Hello"))

This rdd is made of tuples like ("1", 2, "Hello") or ("27", 28, "Hello"). The first element of each tuple is considered the row id, the others will be assigned to columns.

rdd.toHBaseTable("mytable")
    .toColumns("column1", "column2")
    .inColumnFamily("mycf")
    .save()

You are done. HBase now contains 100 rows in table mytable, each row containing two values for columns mycf:column1 and mycf:column2.

Reading from HBase (Basic)

Reading from HBase is easier. Remember to import the implicit conversions:

import it.nerdammer.spark.hbase._

If you want to read the data written in the previous example, you just need to write:

val hBaseRDD = sc.hbaseTable[(String, Int, String)]("mytable")
    .select("column1", "column2")
    .inColumnFamily("mycf")

Now hBaseRDD contains all the data found in the table. Each object in the RDD is a tuple containing (in order) the row id, the corresponding value of column1 (Int) and column2 (String).

If you don't want the row id but, you only want to see the columns, just remove the first element from the tuple specs:

val hBaseRDD = sc.hbaseTable[(Int, String)]("mytable")
    .select("column1", "column2")
    .inColumnFamily("mycf")

This way, only the columns that you have chosen will be selected.

Other Topics

Filtering

It is possible to filter the results by prefixes of row keys. Filtering also supports additional salting prefixes (see the salting section).

val rdd = sc.hbaseTable[(String, String)]("table")
      .select("col")
      .inColumnFamily(columnFamily)
      .withStartRow("00000")
      .withStopRow("00500")

The example above retrieves all rows having a row key greater or equal to 00000 and lower than 00500. The options withStartRow and withStopRow can also be used separately.

Managing Empty Columns

Empty columns are managed by using Option[T] types:

val rdd = sc.hbaseTable[(Option[String], String)]("table")
      .select("column1", "column2")
      .inColumnFamily(columnFamily)

rdd.foreach(t => {
    if(t._1.nonEmpty) println(t._1.get)
})

You can use the Option[T] type every time you are not sure whether a given column is present in your HBase RDD.

Using different column families

Different column families can be used both when reading or writing an RDD.

data.toHBaseTable("mytable")
      .toColumns("column1", "cf2:column2")
      .inColumnFamily("cf1")
      .save()

In the example above, cf1 refers only to column1, because cf2:column2 is already fully qualified.

val count = sc.hbaseTable[(String, String)]("mytable")
      .select("cf1:column1", "column2")
      inColumnFamily("cf2")
      .count

In the reading example above, the default column family cf2 applies only to column2.

Setting the HBase host

The HBase Zookeeper quorum host can be set in multiple ways.

(1) Passing the host to the spark-submit command:

spark-submit --conf spark.hbase.host=thehost ...

(2) If you have access to the JVM parameters:

java -Dspark.hbase.host=thehost -jar ....

(3) Using the scala code:

val sparkConf = new SparkConf()
...
sparkConf.set("spark.hbase.host", "thehost")
...
val sc = new SparkContext(sparkConf)

Usage in Spark Streaming

The connector can be used in Spark Streaming applications with the same API.

// stream is a DStream[(Int, Int)]

stream.foreachRDD(rdd =>
    rdd.toHBaseTable("table")
      .inColumnFamily("cf")
      .toColumns("col1")
      .save()
    )

Advanced

Salting Prefixes

Salting is supported in reads and writes. Only string valued row id are supported at the moment, so salting prefixes should also be of String type.

sc.parallelize(1 to 1000)
      .map(i => (pad(i.toString, 5), "A value"))
      .toHBaseTable(table)
      .inColumnFamily(columnFamily)
      .toColumns("col")
      .withSalting((0 to 9).map(s => s.toString))
      .save()

In the example above, each row id is composed of 5 digits: from 00001 to 01000. The salting property adds a random digit in front, so you will have records like: 800001, 600031, ...

When reading the RDD, you have just to declare the salting type used in the table and ignore it when using bounds (startRow or stopRow). The library takes care of dealing with salting.

val rdd = sc.hbaseTable[String](table)
      .select("col")
      .inColumnFamily(columnFamily)
      .withStartRow("00501")
      .withSalting((0 to 9).map(s => s.toString))

Custom Mapping with Case Classes

Custom mapping can be used in place of the default tuple-mapping technique. Just define a case class for your type:

case class MyData(id: Int, prg: Int, name: String)

and define an object that contains implicit writer and reader for your type

implicit def myDataWriter: FieldWriter[MyData] = new FieldWriter[MyData] {
    override def map(data: MyData): HBaseData =
      Seq(
        Some(Bytes.toBytes(data.id)),
        Some(Bytes.toBytes(data.prg)),
        Some(Bytes.toBytes(data.name))
      )

    override def columns = Seq("prg", "name")
}

Do not forget to override the columns method.

Then, you can define an implicit reader:

implicit def myDataReader: FieldReader[MyData] = new FieldReader[MyData] {
    override def map(data: HBaseData): MyData = MyData(
      id = Bytes.toInt(data.head.get),
      prg = Bytes.toInt(data.drop(1).head.get),
      name = Bytes.toString(data.drop(2).head.get)
    )

    override def columns = Seq("prg", "name")
}

Once you have done, make sure that the implicits are imported and that it does not produce a non-serializable task (Spark will check it at runtime).

You can now use your converters easily:

val data = sc.parallelize(1 to 100).map(i => new MyData(i, i, "Name" + i.toString))
// data is an RDD[MyData]

data.toHBaseTable("mytable")
  .inColumnFamily("mycf")
  .save()

val read = sc.hbaseTable[MyData]("mytable")
  .inColumnFamily("mycf")

The converters above are low level and use directly the HBase API. Since this connector provides you with many predefined converters for simple and complex types, probably you would like to reuse them. The new FieldReaderProxy and FieldWriterProxy API has been created for this purpose.

High-level converters using FieldWriterProxy

You can create a new FieldWriterProxy by declaring a conversion from your custom type to a predefined type. In this case, the predefined type it is a tuple composed of three basic fields:

// MySimpleData is a case class

implicit def myDataWriter: FieldWriter[MySimpleData] = new FieldWriterProxy[MySimpleData, (Int, Int, String)] {

  override def convert(data: MySimpleData) = (data.id, data.prg, data.name) // the first element is the row id

  override def columns = Seq("prg", "name")
}

The corresponding FieldReaderProxy converts back a tuple of three basic fields into objects of class MySimpleData:

implicit def myDataReader: FieldReader[MySimpleData] = new FieldReaderProxy[(Int, Int, String), MySimpleData] {

  override def columns = Seq("prg", "name")

  override def convert(data: (Int, Int, String)) = MySimpleData(data._1, data._2, data._3)
}

Note that we have not used the HBase API. Currently, FieldWriterProxy can read and write tuples up to 22 fields (including the row id).

spark-hbase-connector's People

Contributors

fabiofumarola avatar nicolaferraro avatar rinofm avatar

Watchers

 avatar  avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.