目录
版本说明:
- hbase版本:hbase-1.3.1
- spark版本:spark-2.4.7-bin-hadoop2.7
一、Spark与HBase的集成
背景:
Spark支持多种数据源,但是Spark对HBase的读写都没有相对优雅的api,但spark和HBase整合的场景又比较多,故通过spark的数据源API自己实现了一套比较方便操作HBase的API。
数据模型:
row,addres,age,username
001,guangzhou,20,alex
002,shenzhen,34,jack
003,beijing,23,lili
需求分析:
通过spark读取hbase中的数据或者将数据写入到hbase中。
<!-- hbase依赖包 -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>${hbase.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
代码实现:
package com.kfk.spark.sql
import com.kfk.spark.common.CommSparkSessionScala
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.sql.SparkSession
/**
* @author : 蔡政洁
* @email :[email protected]
* @date : 2020/12/10
* @time : 8:53 下午
*/
object HBaseSpark {
def main(args: Array[String]): Unit = {
// configuration
val spark = CommSparkSessionScala.getSparkSession()
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set("hbase.zookeeper.property.clientPort","2181")
hbaseConf.set("hbase.zookeeper.quorum","bigdata-pro-m04")
hbaseConf.set("hbase.master","bigdata-pro-m04:60010")
// get
getHBase(hbaseConf,spark)
// write
writeHBase(hbaseConf,spark)
}
/**
* 读取hbase中的数据
* @param hbaseConf
* @param spark
*/
def getHBase(hbaseConf : Configuration,spark : SparkSession): Unit ={
// 获取表名
hbaseConf.set(TableInputFormat.INPUT_TABLE,"stu")
// 将hbase中的数据转换成rdd
val hbaseRDD = spark.sparkContext.newAPIHadoopRDD(hbaseConf,
classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
// 打印数据
hbaseRDD.foreach(result => {
val key = Bytes.toString(result._2.getRow)
val addres = Bytes.toString(result._2.getValue("info".getBytes(),"addres".getBytes()))
val age = Bytes.toString(result._2.getValue("info".getBytes(),"age".getBytes()))
val username = Bytes.toString(result._2.getValue("info".getBytes(),"username".getBytes()))
println("row key:" + key + " addres=" + addres + " age=" + age + " username=" + username)hashCode()
/**
* row key:001 addres=guangzhou age=20 username=alex
* row key:002 addres=shenzhen age=34 username=jack
* row key:003 addres=beijing age=23 username=lili
*/
})
}
/**
* 将数据写入到hbase
* @param hbaseConf
* @param spark
*/
def writeHBase(hbaseConf : Configuration,spark : SparkSession): Unit ={
// 初始化job,设置输出格式,TableOutputFormat 是 org.apache.hadoop.hbase.mapred 包下的
val jobConf = new JobConf(hbaseConf)
jobConf.setoutputFormat(classOf[TableOutputFormat])
// 获取表名
jobConf.set(TableOutputFormat.OUTPUT_TABLE,"stu")
// 准备数据
val array = Array("004,shanghai,25,jone",
"005,nanjing,31,cherry",
"006,wuhan,18,pony")
val rdd = spark.sparkContext.makeRDD(array)
// 将写入到hbase的数据转换成rdd
val saveRDD = rdd.map(line => line.split(",")).map(x => {
/**
* 一个Put对象就是一行记录,在构造方法中指定主键
* 所有插入的数据 须用 org.apache.hadoop.hbase.util.Bytes.toBytes 转换
* Put.addColumn 方法接收三个参数:列族,列名,数据
*/
val put = new Put(Bytes.toBytes(x(0)))
put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("addres"),Bytes.toBytes(x(1)))
put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("age"),Bytes.toBytes(x(2)))
put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("username"),Bytes.toBytes(x(3)))
(new ImmutableBytesWritable,put)
})
// 写入到hbase中
saveRDD.saveAsHadoopDataset(jobConf)
}
}
运行结果:
row key:001 addres=guangzhou age=20 username=alex
row key:002 addres=shenzhen age=34 username=jack
row key:003 addres=beijing age=23 username=lili
row key:004 addres=shanghai age=25 username=jone
row key:005 addres=nanjing age=31 username=cherry
row key:006 addres=wuhan age=18 username=pony
本来源数据只有前三行,通过写入后三行,再打印出结果。
二、Spark sql与HBase的集成
Spark sql与HBase集成,其核心就是Spark sql通过hive外部表来获取HBase的表数据。
将hbase、hive、MysqL相关jar包拷贝到spark的jars目录下
hbase:
hbase-client-1.3.1.jar
hbase-common-1.3.1.jar
hbase-protocol-1.3.1.jar
hbase-server-1.3.1.jar
metrics-core-2.2.0.jar
hive:
hive-hbase-handler-2.3.3.jar
htrace-core-3.1.0-incubating.jar
MysqL:
mysql-connector-java-5.1.48-bin.jar
创建与HBase集成的Hive的外部表:
CREATE EXTERNAL TABLE stu(
id string,
addres string,
age string,
username string)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES (
"hbase.columns.mapping" =
":key,info:addres,info:age,info:username")
TBLPROPERTIES ("hbase.table.name" = "stu");
关于hive和hbase的集成请看这篇文章:
https://blog.csdn.net/weixin_45366499/article/details/110955459
启动hive中的metastore:
bin/hive --service metastore
启动spark-shell:
val df = spark.sql("select * from hivespark.stu").show
+---+---------+---+--------+
| id| addres|age|username|
+---+---------+---+--------+
|001|guangzhou| 20| alex|
|002| shenzhen| 34| jack|
|003| beijing| 23| lili|
|004| shanghai| 25| jone|
|005| nanjing| 31| cherry|
|006| wuhan| 18| pony|
+---+---------+---+--------+
以上内容仅供参考学习,如有侵权请联系我删除!
如果这篇文章对您有帮助,左下角的大拇指就是对博主最大的鼓励。
您的鼓励就是博主最大的动力!
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。