微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Spark 写入HBase

我们将Spark处理完的数据一般吸入外部存储系统中,常见的外部存储系统有HBase,MysqL,ElasticSearch,redis,HDFS等。

现在我们主要介绍Spark写入HBase的方法。废话不多说,先贴代码

result.foreachPartition(it=>{

      //创建HBase连接
      val conn: client.Connection = HBaseUtil.getConnection("node2,node3,node4",2181)

      //创建容量为100的集合,存放批量的待写入HBase的数据
      val puts = new util.ArrayList[Put](100)
      val table = conn.getTable(TableName.valueOf("order"))
      //遍历迭代器中的数据
      it.foreach(bean=>{

        //设置数据,包括rk,列族的数据
        //设置rowkey
        val put = new Put(Bytes.toBytes(bean.oid))

        //设置列族的数据
        put.addColumn(Bytes.toBytes("order_info"),Bytes.toBytes("category_name"),Bytes.toBytes(bean.categoryName))
        put.addColumn(Bytes.toBytes("order_info"),Bytes.toBytes("money"),Bytes.toBytes(bean.money))

        //将put放入puts这个list中
        puts.add(put)

        if(puts.size()==100){
          //将数据写入HBase
          table.put(puts)
          puts.clear()
        }

      })
      //将没有达到100的数据也写入到HBase中
      table.put(puts)
      conn.close()
    }
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory}

/**
  * Created by jihn88 on 2020/11/28.
  * HBase工具类,用来创建HBase 的connection
  * zkQuorum zookeeper地址,多个要用逗号分隔
  * port  zookeeper端口
  */
object HBaseUtil {
  def getConnection(zkQuorum: String,port: Int): Connection = synchronized{
    val conf = HBaseConfiguration.create()
    conf.set("hbase.zookeeper.quorum",zkQuorum)
    conf.set("hbase.zookeeeper.property.clientPort",port.toString)
    ConnectionFactory.createConnection(conf)
  }

} 

我们这里使用HBase 中put API写入HBase。

HBaseUtil工具类用于建立HBase连接。这里使用foreachRDD算子,一个分区建立一个一次连接,如果使用foreach算子,一条数据就会建立一次连接,网络开销太大。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐