hortonworks-spark / shc Goto Github PK
View Code? Open in Web Editor NEWThe Apache Spark - Apache HBase Connector is a library to support Spark accessing HBase table as external data source or sink.
License: Apache License 2.0
The Apache Spark - Apache HBase Connector is a library to support Spark accessing HBase table as external data source or sink.
License: Apache License 2.0
Hi I have the same issue with this stackoverflow thread:
Spark-HBase ==> How to load different columns to the same column family in HBase using spark?
I keep getting errors saying "column family already exists"
can someone explain how I can create a new table with the catalog that having multiple columns per column family?
After merging #28 , we had better make examples up-to-date.
I was trying out the connector to work with HBase 1.0.0 and it fails with
ERROR yarn.ApplicationMaster: User class threw exception: java.lang.NoClassDefFoundError: org/apache/spark/sql/catalyst/util/DataTypeParser$
java.lang.NoClassDefFoundError: org/apache/spark/sql/catalyst/util/DataTypeParser$
Could you please specify the minimum HBase requirement for the project?
Hi
I'm unable to insert data to HBase, because my job is failing with exception:
App > 16/07/06 12:16:58 task-result-getter-1 WARN TaskSetManager: Lost task 3.0 in stage 0.0 (TID 3, ip-172-31-19-56.eu-west-1.compute.internal): java.lang.IllegalArgumentException: Can not create a Path from a null string
App > at org.apache.hadoop.fs.Path.checkPathArg(Path.java:125)
App > at org.apache.hadoop.fs.Path.(Path.java:137)
App > at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1197)
App > at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1190)
App > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
App > at org.apache.spark.scheduler.Task.run(Task.scala:89)
App > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
App > at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
App > at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
App > at java.lang.Thread.run(Thread.java:745)
my hbase-site.xml is:
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<property>
<name>zookeeper.recovery.retry</name>
<value>3</value>
</property>
<property>
<name>hbase.regionserver.info.port</name>
<value>16030</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>remote_IP</value>
</property>
<property>
<name>hbase.rootdir</name>
<value>hdfs://remote_IP:9000/hbase</value>
</property>
<property>
<name>hbase.fs.tmp.dir</name>
<value>/user/${user.name}/hbase-staging</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/tmp/hadoop-${user.name}</value>
</property>
<!-- Put any other property here, it will be used -->
</configuration>
my Hbase version:
HBase Version 1.0.0, revision=984db9a1cae088b996e997db9ce83f6d4bd565ad
Any suggesions?
Hi,
I am not able to create a table as it is not able to connect to my cluster using zookeeper.
16/08/08 12:30:13 INFO ClientCnxn: Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to authenticate using SASL (unknown error)
16/08/08 12:30:13 WARN RecoverableZooKeeper: Possibly transient ZooKeeper, quorum=localhost:2181, exception=org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /hbase/hbaseid
16/08/08 12:30:13 WARN ClientCnxn: Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
Could you please tell me where can I put/configure my hbase-site.xml and core-site.xml. Currently I have placed it in the resource folder of my project.
Thanks
It not clear how to handle binary type because in some case it uses this function and in other cases it does not. See #34 for more.
I am writing a little tool to move some data from Hive to HBase and I used SHC in this context.
I finished coding the tool (it works nicely !!) and am taking care now of the details.
SHC does not seem to take the namespace I give him into account.
I created a small Hive table:
case class AgeAndName(age:Int, name:String)
val myseq = 1 to 30000 map(x => AgeAndName(x, s"Name$x is a good name "))
val myDf = sc.parallelize(myseq).toDF
myDf.write.saveAsTable("person.ageAndName")
(this work provided the DB person exists in Hive).
While processing I am turing the case class to an Avro record and take the age as an id for my Rowkey in HBase (the consistency of the example is not relevant here;)).
When inserting the data, I am providing Option as Map[String,String] and inside it the catalog. Here are the logs of YARN that give me info about their content:
17/02/21 14:00:46 INFO SHCHelper$: Generated catalog: {
"table":{"namespace":"person", "name":"ageAndName", "tableCoder":"PrimitiveType"},
"rowkey":"age",
"columns":{
"age":
{"cf":"rowkey", "col":"age", "type":"int"} ,
"record":{"cf":"record", "col":"record", "type":"binary"}
}
}
And these are the logged option passed to the writer:
17/02/21 14:00:48 INFO Hive2Hbase$: This options are passed to the writer: namespace -> person,catalog -> {
"table":{"namespace":"person", "name":"ageAndName", "tableCoder":"PrimitiveType"},
"rowkey":"age",
"columns":{
"age":
{"cf":"rowkey", "col":"age", "type":"int"} ,
"record":{"cf":"record", "col":"record", "type":"binary"}
}
},newtable -> 5
The writing works nicely with SHC but when using hbase shell to check the result here I what I can see:
hbase(main):072:0* list_namespace
NAMESPACE
default
hbase
person
3 row(s) in 0.0180 seconds
hbase(main):073:0> list_namespace_tables 'person'
TABLE
0 row(s) in 0.0110 seconds
hbase(main):074:0> list_namespace_tables 'default'
TABLE
ageAndName
1 row(s) in 0.0110 seconds
I expected the namespace person to store the table ageAndName and not the default.
So how does it come that is not the case ?
I am working with this dependency:
libraryDependencies += "com.hortonworks" % "shc-core" % "1.0.1-1.6-s_2.10"
Constant log impacts performance. Instead of logging, what about adding a new API so users can call at any time for statistics information, like total connection creation requests, total connection close requests, current alive connections, number of connections that have actually been created, etc. Users can do whatever they want with it; print it or log it, or just some assertions.
Niket's 48cffbe brings less time when doing 'In' filter, that could be measured by using "IN filter stack overflow" test case.
Need to figure out why "or" function is taking so long, and then update the implementation of 'In' filter.
Hello,
I am experimenting a NullPointerException running the basice HBaseSource example on the HDP-2.5 Sandbox.
I build an assembly and here is my submit:
/usr/hdp/current/spark-client/bin/spark-submit --driver-memory 1024m --class org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource --master yarn --deploy-mode client --executor-memory 512m --num-executors 4 --files /usr/hdp/current/hbase-master/conf/hbase-site.xml /root/affinytix/tunnel/affinytix-test-tunnel-assembly-1.0.0-SNAPSHOT.jar |& tee /tmp/test-kafka-sparkstreaming.log
And here is the stack (it seems to occur while saving = first phase of the demo populating the table):
16/10/02 08:39:37 INFO ZooKeeperRegistry: ClusterId read in ZooKeeper is null Exception in thread "main" java.lang.RuntimeException: java.lang.NullPointerException at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:208) at org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320) at org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:295) at org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:160) at org.apache.hadoop.hbase.client.ClientScanner.<init>(ClientScanner.java:155) at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:821) at org.apache.hadoop.hbase.client.MetaScanner.metaScan(MetaScanner.java:193) at org.apache.hadoop.hbase.client.MetaScanner.metaScan(MetaScanner.java:89) at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.isTableAvailable(ConnectionManager.java:985) at org.apache.hadoop.hbase.client.HBaseAdmin.isTableAvailable(HBaseAdmin.java:1399) at org.apache.spark.sql.execution.datasources.hbase.HBaseRelation.createTable(HBaseRelation.scala:87) at org.apache.spark.sql.execution.datasources.hbase.DefaultSource.createRelation(HBaseRelation.scala:58) at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:222) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148) at org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource$.main(HBaseSource.scala:90) at org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource.main(HBaseSource.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.NullPointerException at org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher.getMetaReplicaNodes(ZooKeeperWatcher.java:395) at org.apache.hadoop.hbase.zookeeper.MetaTableLocator.blockUntilAvailable(MetaTableLocator.java:553) at org.apache.hadoop.hbase.client.ZooKeeperRegistry.getMetaRegionLocation(ZooKeeperRegistry.java:61) at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateMeta(ConnectionManager.java:1185) at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1152) at org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:300) at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:151) at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:59) at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200) ... 24 more
Any idea about the cause ?
I have consulted the NullPointer from the first issues but both hbase-site.xml and yarn client mode are adopted to execute the example so I don't really understand.
Thanks for helping
Can not get all versions of all row keys stored in a table: https://community.hortonworks.com/questions/83870/spark-hbase-connector-how-to-getscan-all-versions.html
It would be great to have a formal template like Apache Spark.
While trying to save Dataframe to HBase I'm getting an error
Caused by: java.lang.IncompatibleClassChangeError: Found class org.apache.spark.sql.catalyst.expressions.MutableRow, but interface was expected
at org.apache.spark.sql.execution.datasources.hbase.Utils$.setRowCol(Utils.scala:61)
at org.apache.spark.sql.execution.datasources.hbase.HBaseTableScanRDD$$anonfun$buildRow$1.apply(HBaseTableScan.scala:120)
at org.apache.spark.sql.execution.datasources.hbase.HBaseTableScanRDD$$anonfun$buildRow$1.apply(HBaseTableScan.scala:101)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:73)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.execution.datasources.hbase.HBaseTableScanRDD.buildRow(HBaseTableScan.scala:101)
at org.apache.spark.sql.execution.datasources.hbase.HBaseTableScanRDD$$anon$3.next(HBaseTableScan.scala:190)
at org.apache.spark.sql.execution.datasources.hbase.HBaseTableScanRDD$$anon$3.next(HBaseTableScan.scala:180)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:140)
at org.apache.spark.sql.execution.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:130)
at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:285)
at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
It seems like SHC does not support new versions of Catalyst API
Is there any quick fix or workaround for that issue?
Especially it is interesting for current version of Spark 1.6.2 in HDP 2.5.
can you add support for 1.5, or is 1.6 backwards compatible to spark 1.5? a package doesn't seem to be available for 1.5.
Hi,
I tried to run a Spark job to read from/write to HBase in a Horton Works cluster securized by Kerberos and passing the hbase-site.xml with --files never worked for me.
As described in https://community.hortonworks.com/content/supportkb/48988/how-to-run-spark-job-to-interact-with-secured-hbas.html (point 2), the only solution which worked was to copy the hbase-site.xml directly in the Spark conf directory of our Edge node (/etc/spark/conf).
Maybe I'm wrong and it is cluster dependant, but might be good to suggest this solution in the Readme. I could do a PR if needed.
Regards,
I can see the following code in HBaseRelation.scala
if (catalog.numReg > 3) {
val tName = TableName.valueOf(catalog.name)
val cfs = catalog.getColumnFamilies
val connection = HBaseConnectionCache.getConnection(hbaseConf)
// Initialize hBase table if necessary
val admin = connection.getAdmin
// The names of tables which are created by the Examples has prefix "shcExample"
if (admin.isTableAvailable(tName) && tName.toString.startsWith("shcExample")){
admin.disableTable(tName)
admin.deleteTable(tName)
}
if (!admin.isTableAvailable(tName)) {
val tableDesc = new HTableDescriptor(tName)
cfs.foreach { x =>
val cf = new HColumnDescriptor(x.getBytes())
logDebug(s"add family $x to ${catalog.name}")
tableDesc.addFamily(cf)
}
val startKey = Bytes.toBytes("aaaaaaa")
val endKey = Bytes.toBytes("zzzzzzz")
val splitKeys = Bytes.split(startKey, endKey, catalog.numReg - 3)
I am curious to know the reason for this if condition and I also checked this in HBase shell. By default Hbase creates 3 extra regions. Why so ?
Use Spark API (https://github.com/apache/spark/blob/511f52f8423e151b0d0133baf040d34a0af3d422/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L226)
to tell Spark about filters we aren't implementing as opposed to returning all the filters.
Get Connection refused error, while running examples provided here using spark 1.6.0 & hbase 1.2.0
Error:
16/08/27 12:35:01 WARN zookeeper.ClientCnxn: Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:744)
at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:350)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
Starting spark-shell
export HADOOP_HOME=/opt/cloudera/parcels/CDH
export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:/etc/hadoop/conf}
HADOOP_CLASSPATH=/opt/cloudera/parcels/CDH/lib/hbase/lib/*
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/etc/hbase/conf
export SPARK_HOME=/opt/cloudera/parcels/CDH/lib/spark
export SPARK_JAR_HDFS_PATH=/opt/cloudera/parcels/CDH/lib/spark/lib/spark-assembly.jar
export SPARK_CLASSPATH=/opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core.jar:/opt/cloudera/parcels/CDH/lib/hbase/lib/hbase-client-1.2.0-cdh5.7.1.jar:/opt/cloudera/parcels/CDH/lib/hbase/lib/hbase-common-1.2.0-cdh5.7.1.jar:/opt/cloudera/parcels/CDH/lib/hbase/lib/hbase-server-1.2.0-cdh5.7.1.jar:/opt/cloudera/parcels/CDH/lib/hbase/lib/guava-12.0.1.jar:/opt/cloudera/parcels/CDH/lib/hbase/lib/hbase-protocol-1.2.0-cdh5.7.1.jar:/opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core-3.2.0-incubating.jar
spark-shell --master yarn-client --num-executors 2 --driver-memory 512m --executor-memory 512m --executor-cores 1 --jars jars/shc-0.0.11-1.6.1-s_2.10.jar --files /etc/hbase/conf/hbase-site.xml,/etc/hbase/conf/hdfs-site.xml,/etc/hbase/conf/core-site.xml
Summary: The catalog definition to help with movement from dataframe to hbase does not appear to consistently support having multiple columns associated to one column family when loading/saving data
1. Write mode error when more than one column comes from same column family
In spark shell define the catalog:
def empcatalog = s"""{
|"table":{"namespace":"default", "name":"emp"},
|"rowkey":"key",
|"columns":{
|"empNumber":{"cf":"rowkey", "col":"key", "type":"string"},
|"city":{"cf":"personal data", "col":"city", "type":"string"},
|"empName":{"cf":"personal data", "col":"name", "type":"string"},
|"jobDesignation":{"cf":"professional data", "col":"designation", "type":"string"},
|"salary":{"cf":"professional data", "col":"salary", "type":"string"}
|}
|}""".stripMargin
Define Case class:
case class HBaseRecordEmp(
empNumber:String,
city:String,
empName:String,
jobDesignation:String,
salary:String)
create some dummy data with spark and try to write and it says column family already created:
val data = (4 to 10).map { i => {
val name = s"""Bobby${"%03d".format(i)}"""
HBaseRecordEmp(i,
s"MyCity",
name,
"worker",
"5000")
}
}
sc.parallelize(data).toDF.write.options(
Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5")).format("org.apache.spark.sql.execution.datasources.hbase").save()
ERROR:
java.lang.IllegalArgumentException: Family 'professional' already exists so cannot be added
at org.apache.hadoop.hbase.HTableDescriptor.addFamily(HTableDescriptor.java:829)
*2. Go to HBase shell and create the table with 2 column families, each with two columns as above manually and add some data, this works on READ *
def withCatalog(cat: String): DataFrame = {sqlContext.read.options(Map(HBaseTableCatalog.tableCatalog->cat)).format("org.apache.spark.sql.execution.datasources.hbase").load()}
val df = withCatalog(empcatalog)
df.show
**3. Now that the table exists in Hbase with the 2 column families as expected, add some dummy data in the spark shell, and attempt the write again. This will work if you change the SaveMode to "append".
It seems like the hbase connector should support multiple columns in one column family as expected and this behavior is inconsistent.
Hello, Thank you for nice HBase on Spark SQL Package.
I am currently facing certain challenges, when writing / reading to HBase from Spark.
Hadoop 2.7.3
Spark 2.0.1
Hbase 1.2.4
Hive 2.0.1 with MySql as a Metastore
Code:
$SPARK_HOME/bin/spark-shell --packages zhzhan:shc:0.0.11-1.6.1-s_2.10 --files /usr/local/spark/conf/hbase-site.xml
Where hbase-site.xml content:
hbase.rootdir file:///home/hduser/hbaseimport org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.execution.datasources.hbase._
def empcatalog = s"""{
|"table":{"namespace":"empschema", "name":"emp"},
|"rowkey":"key",
|"columns":{
|"empNumber":{"cf":"rowkey", "col":"key", "type":"string"},
|"city":{"cf":"personal data", "col":"city", "type":"string"},
|"empName":{"cf":"personal data", "col":"name", "type":"string"},
|"jobDesignation":{"cf":"professional data", "col":"designation", "type":"string"},
|"salary":{"cf":"professional data", "col":"salary", "type":"string"}
|}
|}""".stripMargin
case class HBaseRecordEmp(
empNumber:String,
city:String,
empName:String,
jobDesignation:String,
salary:String)
val data = (4 to 10).map { i => {
val name = s"""Bobby${"%03d".format(i)}"""
HBaseRecordEmp(i.toString,
s"MyCity",
name,
"worker",
"5000")
}
}
sc.parallelize(data).toDF.write.options(Map(HBaseTableCatalog.tableCatalog -> empcatalog, HBaseTableCatalog.newTable -> "5")).format("org.apache.spark.sql.execution.datasources.hbase").save()
ERROR:
16/12/24 12:57:51 WARN metastore.ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
16/12/24 12:57:52 WARN metastore.ObjectStore: Failed to get database default, returning NoSuchObjectException
16/12/24 12:57:58 ERROR metastore.RetryingHMSHandler: AlreadyExistsException(message:Database default already exists)
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_database(HiveMetaStore.java:891)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:107)
at com.sun.proxy.$Proxy19.create_database(Unknown Source)
java.lang.NoClassDefFoundError: org/apache/spark/Logging
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.getDeclaredConstructors0(Native Method)
at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
at java.lang.Class.getConstructor0(Class.java:3075)
at java.lang.Class.newInstance(Class.java:412)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:455)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211)
... 52 elided
Caused by: java.lang.ClassNotFoundException: org.apache.spark.Logging
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 69 more
I have been following this SO Post to send data to HBASE using shc in pyspark.
However, now i need to read the data back and would like to filter by timerange.
I was wondering, is it possible to use filters in shc with PySpark?
How to fetch byte[] in SHC?
Is there a plan (or near future plan) to add support for salting ?
No idea how difficult it would be, but having BulkLoad support would be great.
How can we increase parallelism and more efficiency in the shc connector.
Can we have a document or Readme or some parameters on the same?
The current composite row key support assumes the row key is like "part1:part2:..partn", where each part is either a well-defined data type like int
or long
, or any custom serdes with fixed bytes length.
But sometimes the row key may be composite and at the same time each part can be varaible length. For example, the bytes array generated by Bytes.toBytes(Long)
has a variable length based on the value/scale of the long.
I'd like to propose to support custom row key serdes to suppose the situation described above. For example, we can add a trait like this:
// Sedes for composite row key
trait RowSedes {
def components: Int
def serialize(value: Any*): Array[Byte]
def deserialize(bytes: Array[Byte], start: Int, end: Int): Seq[Any]
}
And pass it with rowSedes
field in the catalog:
"table": {"namespace": "default", "name": "tbl"},
"rowkey": "part1:part2:part3",
"rowSedes": "com.example.spark.CustomRowSedes",
...
@weiqingy @dongjoon-hyun What do you think?
I am hitting an issue while submitting an example with yarn-cluster deploy mode.
16/07/21 11:08:55 INFO yarn.ApplicationMaster: Unregistering ApplicationMaster with FAILED (diag message: User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, cdh52.vm.com): java.lang.NullPointerException at org.apache.hadoop.hbase.security.UserProvider.instantiate(UserProvider.java:43) at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:214) at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:119) at org.apache.spark.sql.execution.datasources.hbase.TableResource.init(HBaseResources.scala:126) at org.apache.spark.sql.execution.datasources.hbase.ReferencedResource$class.liftedTree1$1(HBaseResources.scala:57) at org.apache.spark.sql.execution.datasources.hbase.ReferencedResource$class.acquire(HBaseResources.scala:54) at org.apache.spark.sql.execution.datasources.hbase.TableResource.acquire(HBaseResources.scala:121) at org.apache.spark.sql.execution.datasources.hbase.ReferencedResource$class.releaseOnException(HBaseResources.scala:74) at org.apache.spark.sql.execution.datasources.hbase.TableResource.releaseOnException(HBaseResources.scala:121) at org.apache.spark.sql.execution.datasources.hbase.TableResource.getScanner(HBaseResources.scala:145) at org.apache.spark.sql.execution.datasources.hbase.HBaseTableScanRDD$$anonfun$9.apply(HBaseTableScan.scala:277) at org.apache.spark.sql.execution.datasources.hbase.HBaseTableScanRDD$$anonfun$9.apply(HBaseTableScan.scala:276) at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53) at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56) at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650) at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165) at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514) at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
I have the Hbase-site.xml in the classpath and present in the spark-conf dir too.
TableOutputFormat creates a connection instance and does not clear it. Here is the hbase bug for it: https://issues.apache.org/jira/browse/HBASE-16017.
With Spark 1.6.1, By default a ddataframe is divided into 200 partitions and saveAsHadoopDataset internally creates a connection for each partition So with every write there are 200 unclosed connections in memory. And over some time, the number of open connections reaches the limit zookeeper can handle and it starts tripping. Here is the code eveidence:
public class TableOutputFormat extends FileOutputFormat<ImmutableBytesWritable, Put> {
public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
public TableOutputFormat() {
}
public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException {
TableName tableName = TableName.valueOf(job.get("hbase.mapred.outputtable"));
BufferedMutator mutator = null;
Connection connection = ConnectionFactory.createConnection(job);
mutator = connection.getBufferedMutator(tableName);
return new TableOutputFormat.TableRecordWriter(mutator);
}
public void checkOutputSpecs(FileSystem ignored, JobConf job) throws FileAlreadyExistsException, InvalidJobConfException, IOException {
String tableName = job.get("hbase.mapred.outputtable");
if(tableName == null) {
throw new IOException("Must specify table name");
}
}
protected static class TableRecordWriter implements RecordWriter<ImmutableBytesWritable, Put> {
private BufferedMutator m_mutator;
public TableRecordWriter(BufferedMutator mutator) throws IOException {
this.m_mutator = mutator;
}
public void close(Reporter reporter) throws IOException {
this.m_mutator.close();
}
public void write(ImmutableBytesWritable key, Put value) throws IOException {
this.m_mutator.mutate(new Put(value));
}
}
}
Connection instance is not closed which is causing this issue.
The spark-hbase-connector doesn't cache connection objects to hbase. Specifically, the call to 'ConnectionFactory.createConnection' is done each time in HBaseResources.scala. This is an expensive operation as documented at https://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/Connection.html. For long-lived processes, it would be very useful to keep a connection cache.
Hi, does this library support working on kerberos?
I am using shc 2.11 with spark 2.0.1. In following example catalogue loading I am using sparkSession instead of sqlContext . Looks like it tries to create a directory similar to my cwd on hdfs! Is there a way I can configure different temp directory for it? Also is this proper way to load catalogue with spark 2?
def withCatalog(cat: String) = { sparkSession .read .options(Map(HBaseTableCatalog.tableCatalog -> cat)) .format("org.apache.spark.sql.execution.datasources.hbase") .load }
Caused by: org.apache.spark.SparkException: Unable to create database default as failed to create its directory hdfs:///home/centos/myapp/spark-warehouse at org.apache.spark.sql.catalyst.catalog.InMemoryCatalog.liftedTree1$1(InMemoryCatalog.scala:114) at org.apache.spark.sql.catalyst.catalog.InMemoryCatalog.createDatabase(InMemoryCatalog.scala:108) at org.apache.spark.sql.catalyst.catalog.SessionCatalog.createDatabase(SessionCatalog.scala:147) at org.apache.spark.sql.catalyst.catalog.SessionCatalog.<init>(SessionCatalog.scala:89) at org.apache.spark.sql.internal.SessionState.catalog$lzycompute(SessionState.scala:95) at org.apache.spark.sql.internal.SessionState.catalog(SessionState.scala:95) at org.apache.spark.sql.internal.SessionState$$anon$1.<init>(SessionState.scala:112) at org.apache.spark.sql.internal.SessionState.analyzer$lzycompute(SessionState.scala:112) at org.apache.spark.sql.internal.SessionState.analyzer(SessionState.scala:111) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:49) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64) at org.apache.spark.sql.SparkSession.baseRelationToDataFrame(SparkSession.scala:382) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:143) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:122) at com.mypckg.DFInitializer.withCatalog(DFInitializer.scala:78)
I'm trying to load data from one dataframe and write it to HBase. Whenever it tries to write it chokes on converting the types. Do I have to extract it all to case classes? I would much rather just use the Row types that come from Spark SQL.
This is using Spark 1.6 and the latest tagged release of shc.
Hello,
The command line given are from my sparkshell:
spark-shell --master yarn \
--deploy-mode client \
--name "hive2hbase" \
--repositories "http://repo.hortonworks.com/content/groups/public/" \
--packages "com.hortonworks:shc:1.0.1-1.6-s_2.10" \
--jars "shc-core-1.0.1-1.6-s_2.10.jar"
--files "/usr/hdp/current/hive-client/conf/hive-site.xml" \
--driver-memory 1G \
--executor-memory 1500m \
--num-executors 6 2> ./spark-shell.log
I have a simple Dataframe of Row of count 5:
scala> newDf
res5: org.apache.spark.sql.DataFrame = [offer_id: int, offer_label: string, universe: string, category: string, sub_category: string, sub_label: string]
That is made of type Row
scala> newDf.take(1)
res6: Array[org.apache.spark.sql.Row] = Array([28896458,Etui de protection bleu pour li...liseuse Cybook Muse Light liseuse Cybook Muse Light liseuse Cybook Muse HD Etui de protection bleu pour lis... Etui de protection noir pour lis... Etui de protection rose pour lis... Etui de protection orange liseus...,null,null,null,null])
I try to insert this with the following catalog:
scala> cat
res0: String =
{
"table":{"namespace":"default", "name":"offDen3m"},
"rowkey":"key",
"columns":{
"offer_id":{"cf":"rowkey", "col":"key", "type":"int"},
"offer_label":{"cf":"cf1", "col":"col1", "type":"string"},
"universe":{"cf":"cf2", "col":"col2", "type":"string"},
"category":{"cf":"cf3", "col":"col3", "type":"string"},
"sub_category":{"cf":"cf4", "col":"col4", "type":"string"},
"sub_label":{"cf":"cf5", "col":"col5", "type":"string"}
}
}
When I try to insert with the following code:
newDf.write.options( Map(HBaseTableCatalog.tableCatalog -> cat, HBaseTableCatalog.newTable -> "5")) .format("org.apache.spark.sql.execution.datasources.hbase") .save()
And I obtain the following stack:
17/01/03 10:36:42 INFO BlockManagerInfo: Removed broadcast_2_piece0 on 149.202.161.158:37691 in memory (size: 6.4 KB, free: 511.1 MB)
java.lang.NoSuchMethodError: scala.runtime.IntRef.create(I)Lscala/runtime/IntRef;
at org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog.initRowKey(HBaseTableCatalog.scala:142)
at org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog.(HBaseTableCatalog.scala:152)
at org.apache.spark.sql.execution.datasources.hbase.HBaseTableCatalog$.apply(HBaseTableCatalog.scala:209)
at org.apache.spark.sql.execution.datasources.hbase.HBaseRelation.(HBaseRelation.scala:163)
at org.apache.spark.sql.execution.datasources.hbase.DefaultSource.createRelation(HBaseRelation.scala:58)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:222)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
My question is double:
Thank you very much for helping
Hello,
We store arrays in Hbase serialized with Avro schema and we need to get the data deserialized by the connector. We have tried using the Avro feature that worked perfectly for other complex types, but in this case we serialize the data only using a simple array schema {"type": "array", "items": ["long","null"]} and not the complete record schema.
The connector fails with ClassCastException while trying to deserialize and cast to GenericRecord.
Do you have any plans of supporting such schemas or array types out of the box ?
Thank you in advance!
Cheers,
Nikolay
Can this connector released to maven central?
Please look at:
https://github.com/hortonworks-spark/shc/blob/master/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/HBaseTableScan.scala#L115
and
https://github.com/hortonworks-spark/shc/blob/master/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/Utils.scala#L58
CellUtil.clone will already create a copy of the data. Another copy is being made within Utils.scala. Generally, binary data (blobs) can be fairly large, so copying may be an expensive operation.
I am trying to use shc in HDP. Spark version in the cluster is 1.5.2, command I run is:
spark-submit --class org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource --master yarn-client --jars /usr/hdp/current/hbase-client/lib/htrace-core-3.1.0-incubating.jar,/usr/hdp/current/hbase-client/lib/hbase-client.jar,/usr/hdp/current/hbase-client/lib/hbase-common.jar,/usr/hdp/current/hbase-client/lib/hbase-server.jar,/usr/hdp/current/hbase-client/lib/guava-12.0.1.jar,/usr/hdp/current/hbase-client/lib/hbase-protocol.jar --files /usr/hdp/current/hbase-client/conf/hbase-site.xml, /usr/hdp/current/hbase-client/conf/hdfs-site.xml /path/to/hbase-spark-connector-1.0.0.jar
Exception:
Exception in thread "main" org.apache.hadoop.hbase.client.RetriesExhaustedException: Failed after attempts=36, exceptions:
Wed Aug 10 14:55:21 EDT 2016, null, java.net.SocketTimeoutException: callTimeout=60000, callDuration=68118: row 'table2,,00000000000000' on table 'hbase:meta' at region=hbase:meta,,1.1588230740, hostname={hostName},16020,1470239773946, seqNum=0
at org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.throwEnrichedException(RpcRetryingCallerWithReadReplicas.java:271)
at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:195)
at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:59)
at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
at org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:320)
at org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:295)
at org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:160)
at org.apache.hadoop.hbase.client.ClientScanner.<init>(ClientScanner.java:155)
at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:821)
at org.apache.hadoop.hbase.client.MetaScanner.metaScan(MetaScanner.java:193)
at org.apache.hadoop.hbase.client.MetaScanner.metaScan(MetaScanner.java:89)
at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.isTableAvailable(ConnectionManager.java:991)
at org.apache.hadoop.hbase.client.HBaseAdmin.isTableAvailable(HBaseAdmin.java:1400)
at org.apache.spark.sql.execution.datasources.hbase.HBaseRelation.createTable(HBaseRelation.scala:95)
at org.apache.spark.sql.execution.datasources.hbase.DefaultSource.createRelation(HBaseRelation.scala:66)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:170)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146)
at org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource$.main(HBaseSource.scala:92)
at org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource.main(HBaseSource.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:685)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.net.SocketTimeoutException: callTimeout=60000, callDuration=68118: row 'table2,,00000000000000' on table 'hbase:meta' at region=hbase:meta,,1.1588230740, hostname={hostName},16020,1470239773946, seqNum=0
at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:159)
at org.apache.hadoop.hbase.client.ResultBoundedCompletionService$QueueingFuture.run(ResultBoundedCompletionService.java:64)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.hadoop.hbase.exceptions.ConnectionClosingException: Call to {hostname}/{ip}:16020 failed on local exception: org.apache.hadoop.hbase.exceptions.ConnectionClosingException: Connection to {hostname}/{ip}:16020 is closing. Call id=9, waitTime=1
at org.apache.hadoop.hbase.ipc.RpcClientImpl.wrapException(RpcClientImpl.java:1259)
at org.apache.hadoop.hbase.ipc.RpcClientImpl.call(RpcClientImpl.java:12e se30)
at org.apache.hadoop.hbase.ipc.AbstractRpcClient.callBlockingMethod(AbstractRpcClient.java:213)
at org.apache.hadoop.hbase.ipc.AbstractRpcClient$BlockingRpcChannelImplementation.callBlockingMethod(AbstractRpcClient.java:287)
at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$BlockingStub.scan(ClientProtos.java:32651)
at org.apache.hadoop.hbase.client.ScannerCallable.openScanner(ScannerCallable.java:372)
at org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:199)
at org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:62)
at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas$RetryingRPC.call(ScannerCallableWithReplicas.java:346)
at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas$RetryingRPC.call(ScannerCallableWithReplicas.java:320)
at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithRetries(RpcRetryingCaller.java:126)
... 4 more
Caused by: org.apache.hadoop.hbase.exceptions.ConnectionClosingException: Connection to {hostName}/{ip}:16020 is closing. Call id=9, waitTime=1
at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.cleanupCalls(RpcClientImpl.java:1047)
at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.close(RpcClientImpl.java:846)
at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.run(RpcClientImpl.java:574)
Cluster is using kerboros authentication. RegionServers are running well in the cluster and I can create or drop table through Hbase-shell.
I guess it should be permission and configuration problem but cannot figure it out.
Please help with an example of loading a csv into HBaseRecord
The example provided uses dummy data , but I am looking for something which can help bulk load csv
object HBaseRecord { def apply(i: Int): HBaseRecord = { val s = s"""row${"%03d".format(i)}"""
HBaseRecord(s, i % 2 == 0, i.toDouble, i.toFloat, i, i.toLong, i.toShort,
s"String$i extra", i.toByte) }}
At line line 194 in the HBaseTableScan (https://github.com/hortonworks-spark/shc/blob/master/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/HBaseTableScan.scala#L194) the number of rows is logged at the log level debug. By changing the log level to info, users can collect metrics about what percentage of their spark job is time spent taking data out of HBase without having to sort though many of the other debug log messages.
Hi,
I'm testing your connector on a HDP cluster and I have these errors :
16/06/19 18:26:40 WARN ClientCnxn: Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1125)
16/06/19 18:26:40 INFO ClientCnxn: Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to authenticate using SASL (unknown error)
16/06/19 18:26:40 WARN ClientCnxn: Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1125)
I submit the spark program like this :
spark-submit --master yarn-client --class test.TestSHC --jars /usr/hdp/current/hbase-client/lib/htrace-core-3.1.0-incubating.jar,/usr/hdp/current/hbase-client/lib/hbase-client.jar,/usr/hdp/current/hbase-client/lib/hbase-common.jar,/usr/hdp/current/hbase-client/lib/hbase-server.jar,/usr/hdp/current/hbase-client/lib/guava-12.0.1.jar,/usr/hdp/current/hbase-client/lib/hbase-protocol.jar,/usr/hdp/current/hbase-client/lib/htrace-core-3.1.0-incubating.jar,/home/cyrille/lib/shc-0.0.11-1.6.1-s_2.10.jar --files /usr/hdp/current/hbase-client/conf/core-site.xml,/usr/hdp/current/hbase-client/conf/hbase-site.xml --num-executors 3 spark-1.0.0-SNAPSHOT.jar
Could you help me to solve this problem ?
Thanks
Hi
How can I use shc to connect to remote HBase cluster? Where I can specify zookeeper and master?
Best
Mikolaj Habdank
I am getting following error when using bigint, long or double datatypes. It runs if I use string. Also document says it supports Java primitive types but the examples have bigint, tinyint, smallint which are not java types.
Caused by: java.lang.IllegalArgumentException: offset (0) + length (8) exceed the capacity of the array: 4
at org.apache.hadoop.hbase.util.Bytes.explainWrongLengthOrOffset(Bytes.java:631)
at org.apache.hadoop.hbase.util.Bytes.toLong(Bytes.java:605)
at org.apache.hadoop.hbase.util.Bytes.toDouble(Bytes.java:729)
at org.apache.spark.sql.execution.datasources.hbase.Utils$.hbaseFieldToScalaType(Utils.scala:51)
at org.apache.spark.sql.execution.datasources.hbase.HBaseTableScanRDD$$anonfun$4.apply(HBaseTableScan.scala:123)
at org.apache.spark.sql.execution.datasources.hbase.HBaseTableScanRDD$$anonfun$4.apply(HBaseTableScan.scala:114)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:74)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.execution.datasources.hbase.HBaseTableScanRDD.buildRow(HBaseTableScan.scala:114)
at org.apache.spark.sql.execution.datasources.hbase.HBaseTableScanRDD$$anon$3.next(HBaseTableScan.scala:205)
at org.apache.spark.sql.execution.datasources.hbase.HBaseTableScanRDD$$anon$3.next(HBaseTableScan.scala:186)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.