小编给大家分享一下spark与hbase怎么用,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!
package hgs.spark.hbase import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.spark.rdd.NewHadoopRDD import org.apache.hadoop.hbase.mapreduce.TableInputFormat object HbaseTest { def main(args: Array[String]): Unit = { val conf = new SparkConf conf.setMaster("local").setAppName("local") val context = new SparkContext(conf) val hadoopconf = new HBaseConfiguration hadoopconf.set("hbase.zookeeper.quorum", "bigdata01:2181,bigdata02:2181,bigdata03:2181") hadoopconf.set("hbase.zookeeper.property.clientPort", "2181") val tableName = "test1" hadoopconf.set(TableInputFormat.INPUT_TABLE, tableName) hadoopconf.set(TableInputFormat.SCAN_ROW_START, "h") hadoopconf.set(TableInputFormat.SCAN_ROW_STOP, "x") hadoopconf.set(TableInputFormat.SCAN_COLUMN_FAMILY, "cf1") hadoopconf.set(TableInputFormat.SCAN_COLUMNS, "cf1:col1,cf1:col2") /*val startrow = "h" val stoprow = "w" val scan = new Scan scan.setStartRow(startrow.getBytes) scan.setStartRow(stoprow.getBytes) val proto = ProtobufUtil.toScan(scan) val scanToString = Base64.encodeBytes(proto.toByteArray()) println(scanToString) hadoopconf.set(TableInputFormat.SCAN, scanToString) */ val hbaseRdd = context.newAPIHadoopRDD(hadoopconf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) hbaseRdd.foreach(x=>{ val vale = x._2.getValue("cf1".getBytes, "col1".getBytes) val val2 = x._2.getValue("cf1".getBytes, "col2".getBytes) println(new String(vale),new String(val2)) }) context.stop() } }
package hgs.spark.hbase import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.io.ImmutableBytesWritable object SparkToHbase { def main(args: Array[String]): Unit = { val conf = new SparkConf conf.setMaster("local").setAppName("local") val context = new SparkContext(conf) val rdd = context.parallelize(List(("aaaaaaa","aaaaaaa"),("bbbbb","bbbbb")), 2) val hadoopconf = new HBaseConfiguration hadoopconf.set("hbase.zookeeper.quorum", "bigdata01:2181,bigdata02:2181,bigdata03:2181") hadoopconf.set("hbase.zookeeper.property.clientPort", "2181") hadoopconf.set(TableOutputFormat.OUTPUT_TABLE, "test1") //hadoopconf.set(TableOutputFormat., "test1") val jobconf = new JobConf(hadoopconf,this.getClass) jobconf.set(TableOutputFormat.OUTPUT_TABLE, "test1") jobconf.setoutputFormat(classOf[TableOutputFormat]) val exterrdd = rdd.map(x=>{ val put = new Put(x._1.getBytes) put.add("cf1".getBytes, "col1".getBytes, x._2.getBytes) (new ImmutableBytesWritable,put) }) exterrdd.saveAsHadoopDataset(jobconf) context.stop() } }
看完了这篇文章,相信你对“spark与hbase怎么用”有了一定的了解,如果想了解更多相关知识,欢迎关注编程之家行业资讯频道,感谢各位的阅读!
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。