微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Spark与HBase的集成

目录

版本说明:

  • hbase版本:hbase-1.3.1
  • spark版本:spark-2.4.7-bin-hadoop2.7

一、Spark与HBase的集成

背景:
Spark支持多种数据源,但是Spark对HBase的读写都没有相对优雅的api,但spark和HBase整合的场景又比较多,故通过spark的数据源API自己实现了一套比较方便操作HBase的API。

数据模型:

row,addres,age,username

001,guangzhou,20,alex
002,shenzhen,34,jack
003,beijing,23,lili

需求分析:
通过spark读取hbase中的数据或者将数据写入到hbase中。

添加配置:
在pom.xml文件添加如下配置:

<!-- hbase依赖包 -->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>${hbase.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-common</artifactId>
            <version>${hbase.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>${hbase.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

代码实现:

package com.kfk.spark.sql

import com.kfk.spark.common.CommSparkSessionScala
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.sql.SparkSession

/**
 * @author : 蔡政洁
 * @email :[email protected]
 * @date : 2020/12/10
 * @time : 8:53 下午
 */
object HBaseSpark {
    def main(args: Array[String]): Unit = {

        // configuration
        val spark = CommSparkSessionScala.getSparkSession()
        val hbaseConf = HBaseConfiguration.create()
        hbaseConf.set("hbase.zookeeper.property.clientPort","2181")
        hbaseConf.set("hbase.zookeeper.quorum","bigdata-pro-m04")
        hbaseConf.set("hbase.master","bigdata-pro-m04:60010")

        // get
        getHBase(hbaseConf,spark)

        // write
        writeHBase(hbaseConf,spark)

    }

    /**
     * 读取hbase中的数据
     * @param hbaseConf
     * @param spark
     */
    def getHBase(hbaseConf : Configuration,spark : SparkSession): Unit ={

        // 获取表名
        hbaseConf.set(TableInputFormat.INPUT_TABLE,"stu")

        // 将hbase中的数据转换成rdd
        val hbaseRDD =  spark.sparkContext.newAPIHadoopRDD(hbaseConf,
            classOf[TableInputFormat],
            classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
            classOf[org.apache.hadoop.hbase.client.Result])

        // 打印数据
        hbaseRDD.foreach(result => {
            val key = Bytes.toString(result._2.getRow)
            val addres = Bytes.toString(result._2.getValue("info".getBytes(),"addres".getBytes()))
            val age = Bytes.toString(result._2.getValue("info".getBytes(),"age".getBytes()))
            val username = Bytes.toString(result._2.getValue("info".getBytes(),"username".getBytes()))

            println("row key:" + key + " addres=" + addres + " age=" + age + " username=" + username)hashCode()

            /**
             * row key:001 addres=guangzhou age=20 username=alex
             * row key:002 addres=shenzhen age=34 username=jack
             * row key:003 addres=beijing age=23 username=lili
             */

        })
    }

    /**
     * 将数据写入到hbase
     * @param hbaseConf
     * @param spark
     */
    def writeHBase(hbaseConf : Configuration,spark : SparkSession): Unit ={

        // 初始化job,设置输出格式,TableOutputFormat 是 org.apache.hadoop.hbase.mapred 包下的
        val jobConf = new JobConf(hbaseConf)

        jobConf.setoutputFormat(classOf[TableOutputFormat])

        // 获取表名
        jobConf.set(TableOutputFormat.OUTPUT_TABLE,"stu")

        // 准备数据
        val array = Array("004,shanghai,25,jone",
                          "005,nanjing,31,cherry",
                          "006,wuhan,18,pony")

        val rdd = spark.sparkContext.makeRDD(array)

        // 将写入到hbase的数据转换成rdd
        val saveRDD = rdd.map(line => line.split(",")).map(x => {

            /**
             * 一个Put对象就是一行记录,在构造方法中指定主键
             * 所有插入的数据 须用 org.apache.hadoop.hbase.util.Bytes.toBytes 转换
             * Put.addColumn 方法接收三个参数:列族,列名,数据
             */
            val put = new Put(Bytes.toBytes(x(0)))
            put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("addres"),Bytes.toBytes(x(1)))
            put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("age"),Bytes.toBytes(x(2)))
            put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("username"),Bytes.toBytes(x(3)))
            (new ImmutableBytesWritable,put)
        })

        // 写入到hbase中
        saveRDD.saveAsHadoopDataset(jobConf)

    }
}

运行结果:

row key:001 addres=guangzhou age=20 username=alex
row key:002 addres=shenzhen age=34 username=jack
row key:003 addres=beijing age=23 username=lili
row key:004 addres=shanghai age=25 username=jone
row key:005 addres=nanjing age=31 username=cherry
row key:006 addres=wuhan age=18 username=pony

本来源数据只有前三行,通过写入后三行,再打印出结果。

二、Spark sql与HBase的集成

Spark sql与HBase集成,其核心就是Spark sql通过hive外部表来获取HBase的表数据。

将hbase、hive、MysqL相关jar包拷贝到spark的jars目录下

hbase:
hbase-client-1.3.1.jar
hbase-common-1.3.1.jar
hbase-protocol-1.3.1.jar
hbase-server-1.3.1.jar
metrics-core-2.2.0.jar

hive:
hive-hbase-handler-2.3.3.jar
htrace-core-3.1.0-incubating.jar

MysqL:
mysql-connector-java-5.1.48-bin.jar

创建与HBase集成的Hive的外部表:

CREATE EXTERNAL TABLE stu(
id string,
addres string,
age string,
username string) 
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES (
"hbase.columns.mapping" = 
":key,info:addres,info:age,info:username") 
TBLPROPERTIES ("hbase.table.name" = "stu");

关于hive和hbase的集成请看这篇文章
https://blog.csdn.net/weixin_45366499/article/details/110955459

启动hive中的metastore

bin/hive --service metastore

启动spark-shell:

val df = spark.sql("select * from hivespark.stu").show
+---+---------+---+--------+
| id|   addres|age|username|
+---+---------+---+--------+
|001|guangzhou| 20|    alex|
|002| shenzhen| 34|    jack|
|003|  beijing| 23|    lili|
|004| shanghai| 25|    jone|
|005|  nanjing| 31|  cherry|
|006|    wuhan| 18|    pony|
+---+---------+---+--------+

以上内容仅供参考学习,如有侵权请联系我删除
如果这篇文章对您有帮助,左下角的大拇指就是对博主最大的鼓励。
您的鼓励就是博主最大的动力!

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐