定义
Spaek
sql是Spark用来处理
结构化数据的
一个模块,它提供了2个编程抽象:DataFrame和DataSet,并且作为分布式
SQL查询引擎的作用。
特点
(1)易整合 (2)统一的数据访问方式 (3)兼容hive (4)标准的数据连接
DataFrame
DataFrame是
一个分布式数据容器,还记录数据的结构信息(schema),同时,也
支持嵌套数据类型。
创建:创建DataFrame有三种方式:通过Spark的数据源进行创建;从
一个存在的RDD进行转换;还可以从Hive Table进行
查询返回。
(1)查看Spark数据源进行创建的
文件格式
scala> spark.read.
csv format jdbc json load option options orc parquet schema table text textFile
读取json
文件创建DataFrame
scala> val df = spark.read.json("/opt/module/spark/examples/src/main/resources/people.json")
df: org.apache.spark.
sql.DataFrame = [age: bigint, name: string]
(2)RDD转换:注意:如果需要RDD与DF或者DS之间操作,那么都需要引入 import spark.implicits._ 【spark不是包名,而是sparkSession对象的
名称】
前置条件:导入隐式转换并创建
一个RDD
scala> import spark.implicits._
import spark.implicits._
scala> val peopleRDD = sc.textFile("examples/src/main/resources/people.txt")
peopleRDD: org.apache.spark.rdd.RDD[String] = examples/src/main/resources/people.txt MapPartitionsRDD[3] at textFile at <console>:27
1)通过手动确定转换
scala> peopleRDD.map{x=>val p
ara = x.split(",");(p
ara(0),p
ara(1).trim.toInt)}.toDF("name","age")
res1: org.apache.spark.
sql.DataFrame = [name: string, age: int]
2)通过反射确定(需要用到样例类)
(1)创建
一个样例类
scala> case class People(name:String, age:Int)
(2)根据样例类将RDD转换为DataFrame
scala> peopleRDD.map{ x => val p
ara = x.split(",");People(p
ara(0),p
ara(1).trim.toInt)}.toDF
res2: org.apache.spark.
sql.DataFrame = [name: string, age: int]
(3)Hive表
DataSet
1)是DataFrame API的
一个扩展,是Spark最新的数据抽象
2)
用户友好的API风格,既具有类型安全检查也具有Dataframe的
查询优化特性。
3)Dataset
支持编解码器,当需要访问非堆上的数据时可以避免反序列化整个对象,提高了效率。
4)样例类被用来在Dataset中定义数据的结构信息,样例类中每个
属性的
名称直接映射到DataSet中的字段
名称。
5) Dataframe是Dataset的特列,DataFrame=Dataset[Row] ,所以可以通过as
方法将Dataframe转换为Dataset。Row是
一个类型,跟Car、Person这些的类型一样,所有的表结构信息我都用Row来表示。
6)DataSet是强类型的。比如可以有Dataset[Car],Dataset[Person].
7)DataFrame只是知道字段,但是不知道字段的类型,所以在执行这些操作的时候是没办法在编译的时候检查是否类型失败的,比如你可以对
一个String进行减法操作,在执行的时候才报错,而DataSet不仅仅知道字段,而且知道字段类型,所以有更严格的
错误检查。就跟JSON对象和类对象之
间的类比。
创建:
创建
一个RDD
scala> val peopleRDD = sc.textFile("examples/src/main/resources/people.txt")
peopleRDD: org.apache.spark.rdd.RDD[String] = examples/src/main/resources/people.txt MapPartitionsRDD[3] at textFile at <console>:27
2)创建
一个样例类
scala> case class Person(name: String, age: Long)
defined class Person
3)将RDD转化为DataSet
scala> peopleRDD.map(line => {val p
ara = line.split(",");Person(p
ara(0),p
ara(1).trim.toInt)}).toDS()
RDD、DataFrame、DataSet之
间的联系和转换
共性
1、RDD、DataFrame、Dataset全都是spark平台下的分布式弹性数据集,为处理超大型数据提供便利
2、三者都有惰性机制,在进行创建、转换,如map
方法时,不会立即执行,只有在遇到Action如foreach时,三者才会开始遍历运算。
3、三者都会根据spark的内存情况
自动缓存运算,这样即使数据量很大,也不用担心会内存溢出。
4、三者都有partition的概念
5、三者有许多共同的
函数,如filter,排序等
6、在对DataFrame和Dataset进行操作许多操作都需要这个包进行
支持
import spark.implicits._
7、DataFrame和Dataset均可使用模式匹配
获取各个字段的值和类型
区别:
1. RDD:
1)RDD一般和spark mlib同时使用
2)RDD
不支持spark
sql操作
2. DataFrame:
1)与RDD和Dataset不同,DataFrame每一行的类型固定为Row,每一列的值没法直接访问,只有通过解析才能
获取各个字段的值
2)DataFrame与Dataset一般不与spark mlib同时使用
3)DataFrame与Dataset均
支持spark
sql的操作,比如select,groupby之类,还能
注册临时表/视窗,进行
sql语句操作
4)DataFrame与Dataset
支持一些特别方便的保存方式,比如保存成csv,可以带上表头,这样每一列的字段名一目了然


spark
sql案例:
val conf: SparkConf = new SparkConf().setAppName("Req1CategoryTop10App").setMaster("local[*]")
val sparkSession: SparkSession = SparkSession.builder().enableHiveSupport().con
fig(conf).g
etorCreate()
import sparkSession.implicits._
val df = spark.read.json("data/people.json")
//
displays the content of the DataFrame to stdout
df.show()
df.filter($"age" > 21).show()
df.createOrReplaceTempView("persons")
spark.
sql("SELECT * FROM persons where age > 21").show()
spark.stop()
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。