'Unable to read Hbase data with spark in yarn cluster mode
Cluster configuration:
- Hadoop: CDH-6.2.1
- Spark: 2.4.0
- Hbase: 2.0
What I do: Read HBase data through Spark
When I use IntelliJ and local mode everything works fine, but when I change mode to
spark-submit --master yarn, the following stacktrace happens:
20/05/20 11:00:46 ERROR mapreduce.TableInputFormat: java.io.IOException: java.lang.reflect.InvocationTargetException
at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:221)
at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:114)
at org.apache.hadoop.hbase.mapreduce.TableInputFormat.initialize(TableInputFormat.java:200)
at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:243)
at org.apache.hadoop.hbase.mapreduce.TableInputFormat.getSplits(TableInputFormat.java:254)
at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:131)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2146)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
at com.song.HbaseOnSpark1$.main(HbaseOnSpark1.scala:32)
at com.song.HbaseOnSpark1.main(HbaseOnSpark1.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:673)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:219)
... 27 more
Caused by: java.lang.NullPointerException
at org.apache.hadoop.hbase.client.ConnectionImplementation.close(ConnectionImplementation.java:1938)
at org.apache.hadoop.hbase.client.ConnectionImplementation.<init>(ConnectionImplementation.java:310)
... 32 more
20/05/20 11:00:46 ERROR yarn.ApplicationMaster: User class threw exception: java.io.IOException: Cannot create a record reader because of a previous error. Please look at the previous logs lines from the task's full log for more details.
java.io.IOException: Cannot create a record reader because of a previous error. Please look at the previous logs lines from the task's full log for more details.
at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:254)
at org.apache.hadoop.hbase.mapreduce.TableInputFormat.getSplits(TableInputFormat.java:254)
at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:131)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2146)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
at com.song.HbaseOnSpark1$.main(HbaseOnSpark1.scala:32)
at com.song.HbaseOnSpark1.main(HbaseOnSpark1.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:673)
Caused by: java.lang.IllegalStateException: The input format instance has not been properly initialized. Ensure you call initializeTable either in your constructor or initialize method
at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getTable(TableInputFormatBase.java:558)
at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:249)
... 24 more
This is my code:
val conf: SparkConf = new SparkConf().setAppName("spark1")
val spark = new SparkContext(conf)
val hbaseConf: Configuration = HBaseConfiguration.create()
hbaseConf.set("hbase.zookeeper.quorum","hadoop01,hadoop02,hadoop03")
hbaseConf.set(TableInputFormat.INPUT_TABLE,"idx_name")
hbaseConf.set("hbase.defaults.for.version.skip", "true")
val rdd: RDD[(ImmutableBytesWritable, Result)] = spark.newAPIHadoopRDD(
hbaseConf,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result]
)
Solution 1:[1]
its hbase classpatth issue in your cluster but you need to add hbase jars to your classpath like this
export SPARK_CLASSPATH=$SPARK_CLASSPATH:`hbase classpath`
hbase classpath will give all the jars for hbase connections and etc....
Why its working in local mode ?
Since all the jars required are there in ide lib
If you are using maven do a mvn depdency:tree to understand what jars are needed in the cluster. based on that you can adjust your spark-submit script.
if you are using --jars option see that all jars passed correctly or uber jar has correct dependencies when packing jar..
There might be jar conflict also check that carefully with local mode environment since thats working fine.
Further reading Spark spark-submit --jars arguments wants comma list, how to declare a directory of jars?
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Community |
