Sparksql
发展过程
解决的问题
-
@H_502_25@Spark sql
使用Hive
解析sql
生成AST
语法树, 将其后的逻辑计划生成, 优化, 物理计划都自己完成, 而不依赖Hive
-
执行计划和优化交给优化器
@H_502_25@Catalyst
-
内建了一套简单的
@H_502_25@sql
解析器, 可以不使用HQL
, 此外, 还引入和DataFrame
这样的DSL API
, 完全可以不依赖任何Hive
的组件 -
@H_502_25@Shark
只能查询文件,Spark sql
可以直接降查询作用于RDD
, 这一点是一个大进步
适用场景
定义 | 特点 | 举例 | |
---|---|---|---|
结构化数据 |
有固定的 |
有预定义的 |
关系型数据库的表 |
半结构化数据 |
没有固定的 |
没有固定的 |
指一些有结构的文件格式, 例如 |
非结构化数据 |
没有固定 |
没有固定 |
指文档图片之类的格式 |
SparkSession
SparkContext在读取文件的时候,读取出来的是 RDD, 不包含 Schema(结构化)信息。所以出现了SparkSession作为Sparksql的入口点,包括了 sqlContext
, HiveContext
, SparkContext
等组件的功能
val spark: SparkSession = new sql.SparkSession.Builder() .appName("hello") .master("local[6]") .getorCreate()
Catalyst优化器
Sparksql
大部分情况用于处理结构化数据和半结构化数据, 所以 Sparksql
可以获知数据的 Schema
, 从而根据其 Schema
来进行优化
Sparksql整体架构
-
@H_502_25@API
层简单的说就是Spark
会通过一些API
接受sql
语句 -
收到
@H_502_25@sql
语句以后, 将其交给Catalyst
,Catalyst
负责解析sql
, 生成执行计划等 -
@H_502_25@Catalyst
的输出应该是RDD
的执行计划 -
最终交由集群运行
@H_502_25@
简单优化过程
Catalyst
的主要运作原理是分为三步, 先对 sql
或者 Dataset
的代码解析, 生成逻辑计划, 后对逻辑计划进行优化, 再生成物理计划, 最后生成代码到集群中以 RDD
的形式运行
Dataset
-
@H_502_25@Dataset
是一个新的Spark
组件, 其底层还是RDD
-
@H_502_25@Dataset
提供了访问对象中某个特定字段的能力, 不用像RDD
一样每次都要针对整个对象做操作 -
@H_502_25@Dataset
和RDD
不同, 如果想把Dataset[T]
转为RDD[T]
, 则需要对Dataset
底层的InternalRow
做转换, 是一个比价重量级的操作
DataFrame
- @H_502_25@
- @H_502_25@
-
@H_502_25@DataFrame
具有数据对象的 Schema 信息 -
可以使用命令式的
@H_502_25@API
操作DataFrame
, 同时也可以使用sql
操作DataFrame
-
@H_502_25@DataFrame
可以由一个已经存在的集合直接创建, 也可以读取外部的数据源来创建
Dataset 和 DataFrame 的异同
DataFrame
是Dataset
的一种特殊情况, 也就是说DataFrame
是Dataset[Row]
的别名。@H_502_25@DataFrame
和Dataset
所表达的语义不同@H_502_25@
DataFrame
表达的含义是一个支持函数式操作的表
, 而Dataset
表达是是一个类似RDD
的东西,Dataset
可以处理任何对象@H_502_25@DataFrame
中所存放的是Row
对象, 而Dataset
中可以存放任何类型的对象@H_502_25@DataFrame
的操作方式和Dataset
是一样的, 但是对于强类型操作而言, 它们处理的类型不同@H_502_25@DataFrame
只能做到运行时类型检查,Dataset
能做到编译和运行时都有类型检查@H_502_25@
代码解析:
def dataframe4(): Unit = { val spark = SparkSession.builder() .appName("dataframe1") .master("local[6]") .getorCreate() import spark.implicits._ val personList = Seq(Person("zhangsan", 15), Person("lisi", 20)) // DataFrame 是弱类型的 val df: DataFrame = personList.toDF() df.map( (row: Row) => Row(row.get(0), row.getAs[Int](1) * 2) )(RowEncoder.apply(df.schema)) .show() // DataFrame 所代表的弱类型操作是编译时不安全 // df.groupBy("name, school") // Dataset 是强类型的 val ds: Dataset[Person] = personList.toDS() ds.map( (person: Person) => Person(person.name, person.age * 2) ) .show() // Dataset 所代表的操作, 是类型安全的, 编译时安全的 // ds.filter( person => person.school ) }
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。