微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

【SparkSQL】在IDEA中使用、创建DF、DF的操作(cache)、自定义函数、DF外部数据源(自定义)、DF与DS转换、DF/DS/RDD的区别、Catalog查看元数据信息

目录

一、在IDEA中使用

二、创建DF

  1、通过格式文件创建DF

  2、通过文本文件创建DF

    2.1、反射的方式

    2.2、编程的方式

三、DF的操作(cache)

  1、常用操作

  2、DF的cache操作

四、Spark SQL自定义函数

五、DF外部数据源

  1、(内置)读写parquet/json/csv文件实现文件类型转换

  2、表分区探测:

  3、(内置)SparkSQL操作Hive

  4、(内置)SparkSQL操作JDBC(MySQL/phoenix/ES)

  5、DF第三方外部数据源

  6、构建自己的外部数据源

    1、源码(外部数据源的执行流程)

    2、自定义外部数据源

六、DF与DS转换(隐式转换)

六、DF/DS/RDD的区别

七、Catalog查看元数据信息


 

一、在IDEA中使用

SparkCore的入口是SparkContext
Spark sql的入口是SparkSession,SparkSession的底层实际上也是SparkContext。

1)pom中加入依赖:

<properties>
    <scala.version>2.11.8</scala.version>
    <hadoop.version>2.6.5</hadoop.version>
    <spark.version>2.4.0</spark.version>
</properties>

<dependency>
    <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>${scala.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>${hadoop.version}</version>
</dependency>
<dependency>
    <groupId>MysqL</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.28</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>${spark.version}</version>
</dependency>
<!-- 加入Sparksql的依赖-->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>${spark.version}</version>
</dependency>

2)代码

import org.apache.spark.sql.SparkSession;
Object SparkSessionApp{
    def main(args:Array[String]):Unit={
        val spark = SparkSession.builder()
                            .master("local[2]")         //和使用Spark的时候类似
                            .appName("SparkSessionApp") //和使用Spark的时候类似
                            .getorCreate()              //有SparkSession对象的时候拿出来,没有的时候创建
                            //.config("spark.some.config.option", "some-value")  设置属性
        spark.sql("").show() 
        
        spark.stop()
    }
}

 

二、创建DF

创建DF有下面几种方式:
1)文件=>DataFrame
2)table in Hive=>DataFrame
3)外部数据源=>DataFrame
4)RDD=>DataFrame
严格的来说,这几种都应该算是外部数据源。

通用的读写方法

1)读:spark.read.format("").option().load(path)

2)写:spark.write.format("").option().save(path)

 

1、通过格式文件创建DF

格式文件就是含有schema的,比如json文件

{"name":"Michal"}
{"name":"Andy","age":30}
{"name":"Mike","age":19}

package com.ruoze
import org.apache.spark.sql.SparkSession;
object SparkSessionApp {
          def main(args:Array[String]):Unit = {
                    val spark = SparkSession.builder()
                      .master("local[2]")
                      .appName("SparkSessionApp")
                      .option("timestampformat", "yyyy/MM/dd HH:mm:ss ZZ")//在windows上有问题的时候加
                      .getorCreate()

                    val df = spark.read.json("file:///C:/xx/a.json")//是spark.read.format("json").load(path)的简写
                    df.show()

                    Thread.sleep(1000000)
                    spark.stop()
          }
}

结果:
+----+------+
| age|  name|
+----+------+
|null|Michal|  
|  30|  Andy|
|  19|  Mike|
+----+------+

 

2、通过文本文件创建DF

直接用:val df = spark.read.text("file:///C:/Users/小西学舞/Desktop/a.txt")
拿到的结果只有一个value字段,不方便后续使用,所以不推荐使用这种方式。
推荐通过RDD转为DF的方式

 

具体RDD->DF操作有两种方式:

1)反射:RDD[case class].toDF()

2)编程:RDD[Row]+schema  或者 RDD[Class]+ClassOf[Class]

准备文本文件

里面有三个脏数据。4,5里面没有name,6里面name是NULL:
1|Mike|12312345678|[email protected]
2|mery|12312345679|[email protected]
3|tom|12312345688|[email protected]
4||12312345689|[email protected]
5||12312345690|[email protected]
6|NULL|12312345691|[email protected]

2.1、反射的方式

package com.ruoze
import org.apache.spark.sql.SparkSession;
object DataFrameApp {
          def main(args:Array[String]):Unit = {
                    val spark = SparkSession.builder()
                              .master("local[2]")
                              .appName("DataFrameApp")
                              .getorCreate()

                    //toDF() 方法需要加入隐式转换
                    import spark.implicits._ 
                    val student = spark.sparkContext.textFile("file:///C:/xx/a.txt")
                              .map(_.split("\\|"))
                              .map(x => Student(x(0),x(1),x(2),x(3)))
                              .toDF()

                    Thread.sleep(1000000)
                    spark.stop()
          }

          case class Student(id:String,name:String,phone:String,email:String)
}

使用这种方式的缺点:
1)case class最多支持22个字段如果超过了22个字段。所以不支持太多字段。
2)而且要提前知道有哪些字段。
不通用。

 

2.2、编程的方式

package com.ruoze
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{StringType, StructField, StructType};
object DataFrameApp {
          def main(args:Array[String]):Unit = {
                    val spark = SparkSession.builder()
                              .master("local[2]")
                              .appName("DataFrameApp")
                              .getorCreate()

                    //第一步:将原始RDD转为ROWS RDD 
                    val student = spark.sparkContext.textFile("file:///C:/Users/小西学舞/Desktop/a.txt")
                        .map(_.split("\\|")).map(x => Row(x(0),x(1),x(2),x(3)))

                    //第二步:定义Schema 
                    //1)StructType是一个case class,传入的参数是Array[StructField]
                    //2)StructField是一个case class,传入的参数是name type nullable(表中一个字段的名字、类型、是否为空)
                    //其中String类型,在这里的type要写StringType
                    val structType = StructType(Array(
                            StructField("id",StringType,true),
                            StructField("name",StringType,true),
                            StructField("phone",StringType,true),
                            StructField("email",StringType,true)
                    ))
                    //第三步
                    //createDataFrame方法是得到一个DataFrame
                    val df = spark.createDataFrame(student,structType)
                    df.show()

                    Thread.sleep(1000000)
                    spark.stop()
          }
}

注意:
1)RDD map之后的数据类型是RDD
2)DataFrame map 之后的数据类型是 DataSet[U]

 

三、DF的操作(cache)

1、常用操作

1、打印Schema数据,数据类型是可以自动推导的

df.printSchema() 
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

2、查询字段
方式1:传入String:df.select("name","age").show()
方式2:传入Column类型的值。又有两种方式:

1)导入隐式转换,使用$:
import spark.implicits._ 
df.select($"name").show()
df.select($"name", $"age" + 1).show()
// +-------+---------+
// |   name|(age + 1)|
// +-------+---------+
// |Michael|     null|
// |   Andy|       31|
// | Justin|       20|
// +-------+---------+

2)使用dfname("")
df.select(df("name"),df("age"),df("age")+10).show() 
+------+----+----------+
|  name| age|(age + 10)|
+------+----+----------+
|Michal|null|      null|
|  Andy|  30|        40|
|  Mike|  19|        29|
+------+----+----------+

使用方式2的好处就是,可以对字段进行运算。

3、过滤:

df.filter($"age" > 21).show()
df.filter(df("age")>20).show()
student.filter("name=''").show
student.filter("name='' or name='NULL'").show
或者:
stu1.where("id>3").show(false) where内部用的就是filter


4、分组求某些运算

df.groupBy("age").count().show()
// +----+-----+
// | age|count|
// +----+-----+
// |  19|    1|
// |null|    1|
// |  30|    1|
// +----+-----+

5、转为临时表,以表的方式操作:

df.createOrReplaceTempView("people")
spark.sql("select * from people").show() 

6、对字段进行类型转换:df.select(df("name").cast(LongType))

7、show方法

show认只显示20条记录,源码:show()=>show(20)=>show(20,true),true表示超长的串会不给显示
show(false)=>show(20,false) false 表示超长的串会给显示
student.show(50,false) //展示50条  长串补全

8、take取值:

student.take(4).show //失败 坑  因为student.take(4)返回的是Array,不是RDD
student.take(4).foreach(println)
student.first //first底层调用head,head底层调用head(1)
student.head(3).foreach(println) //取前3个

9、排序:

student.sort($"name".desc).show(50)
student.sort($"name".desc,$"id".desc).show(50)

10、为列改名字
student.select($"phone".as("mobile")).show()

11、join:
stu1.join(stu2,stu1("id")===stu2("id")).show(false)
stu1.join(stu2,stu1("id")===stu2("id"),"outer").show(false)

 

很多方法都是在DS(org.apache.spark.sql)、Column(org.apache.spark.sql)、functions中定义好的,可以使用的。

1)DS中:像select、map、cache等都是。比如:df.map

2)Column中:像desc、>、equals、as。比如:df.select(df("")>10)

3)functions中:像upper、count、sum。比如:spark.sql("select id,name,upper("name") from xxx")

 

2、DF的cache操作

使用:
spark.catalog.cacheTable("tableName") 或者dataFrame.cache()。在sql中可以使用"cache table tableName;"

Spark UIstorage页签可以看到占用的内存大小。

Sparksql认使用的存储策略和RDD是一样的,也是使用MEMORY_ONLY

清理缓存:spark-sql (default)> uncache table test;

注意这里:
(1)在Spark Core里面cache操作是lazy的
(2)在Spark sql里面cache是eager的(立刻的)

 

四、Spark sql自定义函数

和Hive是很相似的。函数都在object functions中定义了,使用函数,比如:spark.sql("select id,name,upper("name") from xxx")

collect、collect_list、collect_set、count、first、last、sum、max等都是里面的函数

函数的使用:

1)spark.sql("select id,name,upper("name") from xxx")

2) df.select(df("name"),functions.trim(df("name")).as("name1")).show

3)df.selectExpr("name","trim('name') as name1").show

package com.ruoze
import org.apache.spark.sql.SparkSession
object FunctionApp {
          def main(args: Array[String]): Unit = {
                    val spark = SparkSession.builder()
                              .master("local[2]")
                              .appName("FunctionApp")
                              .getorCreate()

                    import spark.implicits._
                    val info = spark.sparkContext.textFile("file:///C:/Users/小西学舞/Desktop/b.txt")
                    val df = info.map(x =>x.split("\t")).map(x =>Info(x(0).trim,x(1).trim)).toDF()

                    //注册自定义函数
                    spark.udf.register("likes_nums",(str:String) =>{
                        str.split(",").size
                    })

                    //使用方式1:
                    df.createOrReplaceTempView("info")
                    spark.sql("select name,likes,likes_nums(likes) as cnt from info").show(false)

                    //使用方式2:
                    df.selectExpr("name","likes","likes_nums(likes) as countnum").show()

                    Thread.sleep(1000000)
                    spark.stop()
          }
        case class Info(name:String,likes:String)
}

从上面的例子可以知道,自定义函数不能以API的方式使用。

 

如果想以API的形式使用(未使用过):

package com.ruoze
import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
//表示对输入数据的封装
case class Employee(name: String, salary: Long)
//得到结果的方式
case class Aver(var sum: Long, var count: Int)
/*
{"name":"Michael", "salary":3000}
{"name":"Andy", "salary":4500}
{"name":"Justin", "salary":3500}
{"name":"Berta", "salary":4000}
求平均工资
 */
class Average extends Aggregator[Employee, Aver, Double] {
  // 初始化方法 初始化每一个分区中的 共享变量
  override def zero: Aver = Aver(0L, 0)
  // 每一个分区中的每一条数据聚合的时候需要调用方法
  override def reduce(b: Aver, a: Employee): Aver = {
    b.sum = b.sum + a.salary
    b.count = b.count + 1
    b
  }
  // 将每一个分区的输出 合并 形成最后的数据
  override def merge(b1: Aver, b2: Aver): Aver = {
    b1.sum = b1.sum + b2.sum
    b1.count = b1.count + b2.count
    b1
  }
  // 给出计算结果
  override def finish(reduction: Aver): Double = {
    reduction.sum.todouble / reduction.count
  }
  // 主要用于对共享变量进行编码
  override def bufferEncoder: Encoder[Aver] = Encoders.product
  // 主要用于将输出进行编码
  override def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}

object Average{
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("udaf").setMaster("local[*]")
    val spark = SparkSession.builder().config(sparkConf).getorCreate()
    import spark.implicits._
    val employee = spark.read.json("C:/Users/小西学舞/Desktop/a.json").as[Employee]
    val aver = new Average().toColumn.name("average") //name表示查询出来的平均值字段的名称

    employee.select(aver).show()
/*
运行的结果:
+-------+
|average|
+-------+
| 3750.0|
+-------+

 */
    Thread.sleep(1000000)
    spark.stop()
  }
}

 

五、DF外部数据源

首先要知道:
1)外部数据源有很多种HDFS、MysqL、json、orc、parquet、HBase/phoenix、ES、Hive
2)对外部数据源的操作无非就是两种:读、存。

Data Source 数据源有三种:
1)内置数据源 Build-in
2)第三方(引入jar包)
3)自定义外部数据源

Spark sql认的数据源是parquet:spark.sql.sources.default=parquet

查看认的数据源:
spark-sql (default)> set spark.sql.sources.default;
key     value
spark.sql.sources.default       parquet

外部数据源通用的操作:
 spark.read.format("json").load("file:///XXXX.json")
 spark.write.format("parquet").save("hdfs://hadoop001:9000/XXXX.parquet")

 

1、(内置)读写parquet/json/csv文件实现文件类型转换

例子1:读写parquet文件

spark官方提供了一个json格式的文件
cd $SPARK_HOME
cd example/src/main/resources/
cat people.json 
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

package com.ruoze
import org.apache.spark.sql.SparkSession
import spark.implicits._
object DataSourceApp {
          def main(args:Array[String]):Unit = {
                    val spark = SparkSession.builder()
                              .master("local[2]")
                              .appName("DataFrameApp")
                              .getorCreate()
                    //1)简化的写法
                    spark.read.json("file:///XXX")
                    //2)标准写法,推荐
                    spark.read.format("json").load("file:///XXXX")
                    df.write.format("parquet").save("file:///C:/Users/小西学舞/Desktop/test/peopleparquet/")
                    Thread.sleep(1000000)
                    spark.stop()
          }
}

注意:
在windows上执行的时候,要加上:option(“timestampformat”, “yyyy/MM/dd HH:mm:ss ZZ”)
不然报错:`Exception in thread "main" java.lang.IllegalArgumentException: Illegal pattern component: XXX`

生成的是一个目录(parquet格式认使用snappy压缩):

例子2:读写csv文件

准备文件
name;age;job
Jorge;30;Developer
Bob;32;Developer

读取文件
val df = spark.read.format("csv").option("sep",";").option("header","true").load("file:///home/hadoop/data/a.csv").
注意:
1)option("sep",";"):指定分隔符
2)option("header","true"):展示头
3)如果是在Windows上运行,可以添加option("timestampformat", "yyyy/MM/dd HH:mm:ss ZZ")

写出文件
df.write.format("json").save("file:///home/hadoop/data/")
df1.write.format("csv").mode("overwrite").save("hdfs://hadoop001:9000/test/")
如果写多次,会提示目标目录已经存在,所以需要使用mode来指明覆盖还是追加的模式。

可以自定义schema:
val peopleschema = StructType(Array(
StructField("hlwname",StringType,true), 
StructField("hlwage",IntegerType,true), 
StructField("hlwjob",StringType,true)))
使用:.schema(peopleschema)

生产上的应用:
读入text,json等行式存储的文件,写出为ORC,parquet列式存储的文件,并加上压缩。
大大减少文件存储空间和IO,提高性能
比如:ETL清洗之后存储到Hive表上:df.write.mode(SaveMode.Overwrite).format("parquet").partitionBy("d", "h").save(outputPath)

 

2、表分区探测:

当Spark在load加载数据的时候,可以自动的推测出来加载的文件路径上的分区信息。

例子1:

在HDFS上创建分区路径:
hadoop fs -mkdir -p /sql/table/gender=male/country=US
hadoop fs -mkdir -p /sql/table/gender=male/country=CN
hadoop fs -mkdir -p /sql/table/gender=female/country=US
hadoop fs -mkdir -p /sql/table/gender=female/country=CN

hadoop fs -put users.parquet /sql/table/gender=male/country=US/

spark-shell
spark.read.format("parquet").load("hdfs://hadoop001:9000/sql/table").printSchema:
root                                                                            
 |-- name: string (nullable = true)
 |-- favorite_color: string (nullable = true)
 |-- favorite_numbers: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- gender: string (nullable = true)
 |-- country: string (nullable = true)
可以看到分区信息给推测出来了。

当然只支持numeric、date、timestamp、string。分区的类型一般都是string类型的。

如果不想让spark推测分区的数据类型:
spark.sql.sources.partitionColumnTypeInference.enabled=false(认是true)

如果加载的路径中包含一层分区信息,就不会再加载这层分区信息了:

spark.read.format("parquet").load("hdfs://hadoop0000:8020/sql/table/gender=male").printSchema:
root
    |--name:string(nullable = true)
    |--favorite_color:string(nullable = true)
    |--favorite_numbers:array(nullable = true)
    |  |--element:integer(containsNull = true)
    |--country:string(nullable = true)
可以看到只有country分区字段,所以使用起来是很灵活的。

 

3、(内置)Sparksql操作Hive

参考:http://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html

如果想访问Hive,必须开启enableHiveSupport:

val spark = SparkSession.builder()
            .master("")
            .appName("")
            .enableHiveSupport() 
            .getorCreate()
但是:在windows上不支持enableHiveSupport,会出现异常:Unable to instantiate SparkSession with Hive support because Hive classes are not found. 所以不能在windows上运行。

所以本地测试的时候只能本地建立表进行测试且不能用enableHiveSupport。如果想连接hive进行测试,需要开启enableHiveSupport且只能提交到集群上去运行。

测试使用:

1)Spark使用本地表,不访问Hive表(用的很少):

pom依赖:

<properties>
            <scala.version>2.11.8</scala.version>
            <hadoop.version>2.6.5</hadoop.version>
            <spark.version>2.4.0</spark.version>
</properties>
<dependencies>
            <dependency>
                      <groupId>org.scala-lang</groupId>
                      <artifactId>scala-library</artifactId>
                      <version>${scala.version}</version>
            </dependency>
            <dependency>
                      <groupId>org.apache.hadoop</groupId>
                      <artifactId>hadoop-client</artifactId>
                      <version>${hadoop.version}</version>
            </dependency>
            <dependency>
                      <groupId>MysqL</groupId>
                      <artifactId>mysql-connector-java</artifactId>
                      <version>5.1.28</version>
            </dependency>
            <dependency>
                      <groupId>org.apache.spark</groupId>
                      <artifactId>spark-sql_2.11</artifactId>   里面已经包含了spark-core_2.11依赖
                      <version>${spark.version}</version>
            </dependency>
            <dependency>
                      <groupId>org.apache.spark</groupId>
                      <artifactId>spark-hive_2.11</artifactId>
                      <version>${spark.version}</version>
            </dependency>
</dependencies>

代码

package com.ruoze
import org.apache.spark.sql.{Row,SaveMode,SparkSession}
object DataSourceApp {
          def main(args:Array[String]):Unit = {
                        val spark = SparkSession.builder()
                                  .master("local[2]")
                                  .appName("DataFrameApp")
                                  //.enableHiveSupport() 
                                  .getorCreate()

                        import spark.sql  
                        import spark.implicits._ 

                        sql("create table if not exists src(key int,value string) using hive") //这样就有两个数据库default、hive了
                        sql("load data local inpath 'C:/Users/小西学舞/Desktop/kv1.txt' into table src")
                        //也可以加载HDFS上的文件
                        //sql("load data local inpath '/home/hadoop/app/spark-2.4.2-bin-2.6.0-cdh5.7.0/examples/src/main/resources/kv1.txt' into table src")
                        val sqlDF = sql("select key,value from src where key < 10 order by key")
                        val stringsDS = sqlDF.map{
                             case Row(key:Int,value:String) => s"key:$key,Value:$value"
                        }
                        stringsDS.show()
                        Thread.sleep(1000000)
                        spark.stop()
          }
}

2)Spark使用Hive表,访问Hive表(需要提交到集群上去):

pom依赖:.....

代码

package com.ruoze
import org.apache.spark.sql.{Row,SaveMode,SparkSession}
object DataSourceApp {
          def main(args:Array[String]):Unit = {
                        val spark = SparkSession.builder()
                                  .master("local[2]")
                                  .appName("DataFrameApp")
                                  .enableHiveSupport() 
                                  .getorCreate()

                        import spark.sql  
                        import spark.implicits._ 

                        sql("show databases") //这样就可以看到Hive中的库了
                        ........
                        spark.stop()
          }
}

代码打包提交到集群上运行:

代码中去掉appName、mater,提交命令添加--master、--jars、--driver-classpath加上MysqL的驱动包。

3)存储进Hive:

方式1:df.write.mode(SaveMode.Overwrite).format("parquet").partitionBy("d","h").save(outputPath)

方式2:insert hive表sql

 

4、(内置)Sparksql操作JDBC(MysqL/phoenix/ES)

参考:http://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#jdbc-to-other-databases

比如读写MysqL、phoenix、ES。

从里面读:

package com.ruoze
import org.apache.spark.sql.{Row,SaveMode,SparkSession}
object DataSourceApp {
          def main(args:Array[String]):Unit = {
                    val spark = SparkSession.builder()
                              .master("local[2]")
                              .appName("DataFrameApp")
                              .getorCreate()
                    val jdbcDF = spark.read
                            .format("jdbc")
                            .option("url","jdbc:MysqL://192.168.xxx.xxx/ruozedata")
                            .option("dbtable","TBLS")
                            .option("user","root")
                            .option("password","123456")
                            .load()
                    jdbcDF.show()
                    Thread.sleep(1000000)
                    spark.stop()
          }
}

写进去:

1)通用的方式:df.write.jdbc()  不推荐。

2)推荐用df.foreachPartition

注意:
1)代码可以本地运行,也可以提交到集群上去运行,同样需要加入驱动包。
2)不同数据库库、表、字段可能转为大写,使用的时候最好都ucase处理:spark.sql("select ucase(name) from people").show()

 

5、DF第三方外部数据源

官网:https://spark-packages.org/

 

6、构建自己的外部数据源

1、源码(外部数据源的执行流程)

1)Spark.read.format(“json”).load(“”)
2)Load底层会调用方法,将BaseRelation转为DF:sparkSession.baseRelationToDataFrame(BashRelation)来创建DF
3)BashRelation的来源是:DataSource.resolveRelation()
4)resolveRelation方法内部是:
      通过RelationProvider.createRelation来创建Relation。传入sqlContext、option参数
      或者SchemaRelationProvider.createRelation来创建Relation。传入sqlContext、option、schema参数。
5)不同的RelationProvider的实现创建不同的BaseRelation。比如看的是JDBCRelationProver,他创建JDBCRelation
6)JDBCRelation
A.继承抽象类 JDBCRelation extends BaseRelation:BaseRelation里面可以传入一些sqlContext、schema、option(比如path)。也可以自定义schema。
B.实现一些接口
TableScan   实现全表扫描的方法:buildScan(): RDD[Row]
PrunedScan   实现列裁剪buildScan(requiredColumns: Array[String]): RDD[Row]
PrunedFilteredScan  实现列裁剪和过滤buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row]
InsertableRelation  实现插入insert(data: DataFrame, overwrite: Boolean)
7)创建Relation的工厂
DefaultSource extends RelationProvider:createRelation(sqlContext,p@R_404_6460@mMap):
              with SchemaRelationProvider:createRelation(sqlContext,p@R_404_6460@mMap,schema):
这里面来创建比如:TextDataSourceRelation。它的工厂的名字一定得是DefaultSource。
8)使用的时候spark.read.format(“Relation的包名就可以”).option(“path”,””).load()
本人测试的时候,发现只有全表扫描的方法是使用的,其他的方法没用过。全表扫描就可以实现全表、列裁剪、过滤。

2、自定义外部数据源

1)必须继承BaseRelation 抽象类,实现def schema: StructType字段的值。
2)实现接口,可选择进行实现
TableScan :全表扫描 select * from XXX  。方法:def buildScan(): RDD[Row]
PrunedScan :列裁剪 select a,b from xxx。
      方法:def buildScan(requiredColumns: Array[String], ): RDD[Row]
PrunedFilteredScan:裁剪+过滤 select a,b from xxx where c = 1
     方法:def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row]
InsertableRelation:插入。方法:def insert(data: DataFrame, overwrite: Boolean): Unit

1)数据:
10000,PK,0,100000,200000
10001,jepson,0,99999,199999
10002,17er,1,2000,5
10003,laoer,2,2001,6
10004,laoliang,0,2002,7

2)构建自己的Relation:TextDatasourceRelation
package com.mytest
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, sqlContext}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
class TextDatasourceRelation(override val sqlContext: sqlContext,
                             path:String,
                             userSchema:StructType)
          extends BaseRelation with TableScan
            with PrunedScan with PrunedFilteredScan
            with Logging{

          override def schema: StructType = {
            if(userSchema != null){
              userSchema
            } else {
              StructType(
            StructField("id",StringType,false) ::
              StructField("name",StringType,false) ::
              StructField("gender",StringType,false) ::
              StructField("salary",StringType,false) ::
              StructField("comm",StringType,false) :: Nil
              )
            }
          }

          // select * from xxx
          override def buildScan(): RDD[Row] = {
            logError("this is ruozedata custom buildScan...")

            var rdd = sqlContext.sparkContext.wholeTextFiles(path).map(_._2)
            val schemaField = schema.fields

            // rdd + schemaField
            val rows = rdd.map(fileContent => {
              val lines = fileContent.split("\n")
              val data = lines.map(_.split(",").map(x=>x.trim)).toSeq

              val result = data.map(x => x.zipwithIndex.map{  //x是Array数组,将Array[String]转为Array[String,Index]
            case  (value, index) => {
              val columnName = schemaField(index).name
              if(columnName.equalsIgnoreCase("gender")) {
                if(value == "0") {
                  "男"
                } else if(value == "1"){
                  "女"
                } else {
                  "未知"
                }
              } else {
                value
              }

            }//case
              })
              result.map(x => Row.fromSeq(x))
            })

            rows.flatMap(x=>x)
          }

          //select a,b from xxx
          override def buildScan(requiredColumns: Array[String]): RDD[Row] = {
            var rdd = sqlContext.sparkContext.wholeTextFiles(path).map(_._2)
            val schemaField = schema.fields

            // rdd + schemaField
            val rows = rdd.map(fileContent => {
              val lines = fileContent.split("\n")
              val data = lines.map(_.split(",").map(x=>x.trim)).toSeq

              val result = data.map(x => x.zipwithIndex.map{ //x是Array数组

            case  (value, index) => {
              val columnName = schemaField(index).name
              for(i <- requiredColumns){
                if(columnName.equalsIgnoreCase(i)){
                  if(columnName.equalsIgnoreCase("gender")) {
                if(value == "0") {
                  "男"
                } else if(value == "1"){
                  "女"
                } else {
                  "未知"
                }
                  } else {
                value
                  }//else
                }else{

                }
              }
            }//case
              })//result

              result.map(x => Row.fromSeq(x))
            })//fileContent

            rows.flatMap(x=>x)
          }
  
        override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row] = {buildScan}

}

3)创建Relation的工厂:
package com.mytest
import org.apache.spark.sql.sqlContext
import org.apache.spark.sql.sources.{BaseRelation, RelationProvider, SchemaRelationProvider}
import org.apache.spark.sql.types.StructType   //其中DefaultSource 名字是固定的
class DefaultSource extends RelationProvider with SchemaRelationProvider{


           //重载的创建Relation的方法 
           def createRelation(
                      sqlContext: sqlContext,
                      p@R_404_6460@meters: Map[String, String],
                      schema: StructType): BaseRelation = {
                    val path = p@R_404[email protected]("path")

                    path match {
                      case Some(p) => new TextDatasourceRelation(sqlContext,p,schema)
                      case _ => throw  new IllegalArgumentException("path is required...")
                    }
          }

          //RelationProvider接口的
          override def createRelation(sqlContext: sqlContext, p@R_404_6460@meters: Map[String, String]): BaseRelation = {
            createRelation(sqlContext,p@R_404_6460@meters,null)
          }
}

4)测试使用自定义的DataSource

package com.mytest
import org.apache.spark.sql.SparkSession
object TextApp {
      def main(args: Array[String]): Unit = {
            val spark = SparkSession.builder().appName("TextApp").master("local[2]").getorCreate()
            val df = spark.read.format("com.mytest").option("path","C://Users/小西学舞/Desktop/mytest/").load()
            df.select("name","gender").show()
            spark.stop()
      }
}

format不能用简写,要用全称,写包名就可以。

 

六、DF与DS转换(隐式转换)

DF=DS[Row]

DF和DS的用法基本上是一致的。但是像map、filter等方法都是DS中的方法

需要在代码中导入隐式转换的包 import spark.implicits._。

 

将DataFrame转为DataSet有两种方式:

1)定义case class的方式:

     case class People(name:String,age:String,job:String)
     val ds = spark.read.format("").load("file:///xxx/people.json").as[People]

2)使用df.map函数

 

六、DF/DS/RDD的区别

1)schema:RDD只是一个集合,没有schema的概念, 并不知道里面具体的字段,但是DataFrame = RDD+schema,可以自定义schema。
2)性能RDD可以使用java/scala/python,分别是JVM/JVM/python执行引擎,性能不一样,不好优化。
但是DF都是转为逻辑执行计划之后再处理,所以不管是用什么语言,性能都是一样的。而且Spark sql的计算速度比RDD更快。
3)代码量:DF/DS有高级API:sql、DataFrame、DataSet,代码更少,但是RDD代码很多。
4)检查时机:DF=DS[Row]
sql:运行时才进行语法检查和分析检查
DF:编译时检查解析,运行时检查分析。
DS:检查解析/分析都在编译期间,把error提前暴露出来了。这是使用DataSet带来的最大的一个好处。
5)语言:DF支持使用Scala, Java, Python, and R进行开发,DS只支持Scala and Java. Python暂时不支持

 

七、Catalog查看元数据信息

想要查看Hive表的元数据,有下面几种方式:

1)直接查看MysqL表上的元数据。

2)通过desc formatted 查看部分元数据

3)通过Catalog接口,直接查的也是MysqL元数据。

Spark sql的执行流程中,分析的一步就是需要通过Catalog查看元数据。

package com.ruoze
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalog.Catalog
object FunctionApp {
          def main(args: Array[String]): Unit = {
                    val spark = SparkSession.builder()
                              .master("local[2]")
                              .appName("FunctionApp")
                              .getorCreate()

                    val catalog = spark.catalog
                    catalog.listDatabases().show()  
                    catalog.listTables("default").show()
                    catalog.listTables("default").select("name").show()
                    catalog.listFunctions()

                    Thread.sleep(1000000)
                    spark.stop()
          }
}

上面的代码只能在本地查,不能查询Hive,因为访问Hive需要有enableHiveSupport支持

 

但是可以在spark-shell中直接查:

scala> val catalog = spark.catalog
scala> catalog.listDatabases().show()
+-------+--------------------+--------------------+
|   name|         description|         locationUri|
+-------+--------------------+--------------------+
|default|Default Hive data...|hdfs://hadoop001:...|
|   hive|                    |hdfs://hadoop001:...|
+-------+--------------------+--------------------+

scala> catalog.listTables("hive").show()
+---------+--------+-----------+---------+-----------+
|     name|database|description|tableType|istemporary|
+---------+--------+-----------+---------+-----------+
|      emp|    hive|       null|  MANAGED|      false|
|g6_access|    hive|       null| EXTERNAL|      false|
|      src|    hive|       null|  MANAGED|      false|
|       t1|    hive|       null|  MANAGED|      false|
|     test|    hive|       null|  MANAGED|      false|
|      ttt|    hive|       null|  MANAGED|      false|
+---------+--------+-----------+---------+-----------+

scala> catalog.listTables("hive").select("name").show
+---------+
|     name|
+---------+
|      emp|
|g6_access|
|      src|
|       t1|
|     test|
|      ttt|
+---------+

scala> catalog.listFunctions().show
+----------+--------+-----------+--------------------+-----------+
|      name|database|description|           className|istemporary|
+----------+--------+-----------+--------------------+-----------+
|         !|    null|       null|org.apache.spark....|       true|
|         %|    null|       null|org.apache.spark....|       true|
|         &|    null|       null|org.apache.spark....|       true|
....................
+----------+--------+-----------+--------------------+-----------+

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐