一.概述
1.前世今生
- 大量数据需要处理 ➡️ MapReduce出现
- sql on mr ➡️ Hive
- mr效率太低 ➡️ Tez
- Tez效率低 ➡️ Spark
- sql on spark ➡️ Shark(太多的的借鉴了Hive制约了它,然后被推翻了,现在已经被弃用)
- sql on spark ➡️ Sparksql
2.简介
- Spark sql是Spark处理数据的一个模块
- 专门用来处理结构化数据的模块,像json,parquet,avro,csv,普通表格数据等均可。
- 与基础RDD的API不同,Spark sql中提供的接口将提供给更多关于结构化数据和计算的信息,并针对这些信息,进行额外的处理优化。
3.操作方式
- Sparksql shell
- 类似于hive shell
- DataFrames API
- DataSets API
4.特点
- 可以利用sql、DataFrams API、DataSets API或其它语言调用的基于sparksql模块计算,均是sparkcore执行引擎,其对计算的表达是独立的,即开发人员可以轻松在不同API之间切换实现相同的功能。
- 也可以通过命令行、JDBC、ODBC的方式来操作Sparksql,方便其它数据平台、BI平台使用Sparksql模块。
- 在spark应用程序开发中,可以无缝使用Sparksql操作数据。
- 可以直接使用Hive表格数据。
- 与Hive的兼容性极好:它复用了Hive的前端(去掉驱动mapreduce执行任务的部分)和元数据,因此可以拿过来hivesql的东西在sparksql上运行即可。
- Sparksql的应用中,sql是一个重要方面,但不局限制sql。
5.Sparksql愿景
二.相关名词解释
- sql
- 数据查询语言,面向数据编程的最高抽象
- HQL = Hive sql
- sql on hadoop
- Shark
- 最早发展的sql on spark项目,已废弃
- Sparksql
- spark on sql 首先
三.shell 操作sparksql
四.DataFrames操作sparksql
1.项目创建
首先根据模板创建一个scala项目
模板:
group:net.alchim31.maven
artifact: scala-archetype-simple
version: 1.7
repository:https://maven.aliyun.com/repository/central
2.配置项目
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.11.11</scala.version>
<scala.compat.version>2.11</scala.compat.version>
<spec2.version>4.2.0</spec2.version>
</properties>
<!--scala依赖-->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!--sparkcore依赖 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.compat.version}</artifactId>
<version>2.3.2</version>
<scope>provided</scope>
</dependency>
<!--sparksql依赖-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.compat.version}</artifactId>
<version>2.3.2</version>
<scope>provided</scope>
</dependency>
<!--log4j-->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j</artifactId>
<version>2.14.1</version>
</dependency>
- 在main下创建目录resouces目录,并将log4j的配置文件放入
3.代码编写
3.1DataFrames1.6
package com.antg.main
import org.apache.spark.sql.sqlContext
import org.apache.spark.{SparkConf, SparkContext}
object DataFrames1_6 {
def main(args: Array[String]): Unit = {
//创建sparkconf
val conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("测试DF1.6")
//创建上下文环境
val sc = new SparkContext(conf)
//创建sql上下文
val sqlContext = new sqlContext(sc)
//读取数据
val df = sqlContext.read.json("C:\\Users\\Administrator\\Desktop\\data.json")
//显示全部信息
df.show()
//关闭上下文
sc.stop()
}
}
3.2Dataframes2.3
package com.antg.main
import org.apache.spark.sql.SparkSession
object DataFrames2_3 {
def main(args: Array[String]): Unit = {
//创建session
val sparkSession = SparkSession.builder()
.master("local[*]")
.appName("dataframes2.3")
.getorCreate()
//创建df
val df = sparkSession.read.json("C:\\Users\\Administrator\\Desktop\\data.json")
//虚表
val vrTable = df.createTempView("vrTable")
sparkSession.sql("select * from vrTable").show()
//数据持久化
df.repartition(2).write.format("parquet").save("./data")
//关闭
sparkSession.stop()
}
}
3.3rdd转换成df
package com.antg.main
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
object RDD_DF {
def main(args: Array[String]): Unit = {
var sparkSession = SparkSession.builder()
.appName("test_rdd to df")
.master("local[*]")
.getorCreate()
var scheme = StructType(
"stdno name classId className".split(" ").map(t => StructField(t,StringType,true))
)
var lineRDD = sparkSession.sparkContext.textFile("C:\\Users\\Administrator\\Desktop\\student_MysqL.txt")
var rowRDD = lineRDD.map(_.split("\t")).map(row => Row(row(0),row(1),row(2),row(3)))
var df = sparkSession.createDataFrame(rowRDD,scheme)
df.show()
df.printSchema()
sparkSession.stop()
}
}
五.parquet数据格式
- 概述
- 产生背景
- 优点
六.DataSet 操作sparksql
- 环境搭建
- 与DataFrames一样
- 概述
- DataSet集成了RDD和DataFrame的优点,也称为强类型的DataFrame。
- DataSets和DataFrames具有完全相同的成员函数。
- 两者中,每个行的数据类型不同。DataFrame也可以叫Dataset[Row],即DataFrame是Dataset的一种特定形式。而DataSet的每一行是不固定的,需要模式匹配来确定。
- 版本说明
- 1.6 版本的时候为测试版本,好多API还不是很丰富
- 在2.0.0开始DataSet得到正式推广使用,由于其API和DataFrame在成员函数中完全对等,在使用上差异极小,由于是强类型,故仅在数据集case class模式匹配时,有明显差别。
- 例子
package com.antg.main
import org.apache.spark.sql.SparkSession
case class Student(name:String,age:BigInt)
object TestDS {
def main(args: Array[String]): Unit = {
//创建Session
val sparkSession = SparkSession.builder()
.appName("ds test")
.master("local[*]")
.getorCreate()
//引入自动隐式转换
import sparkSession.implicits._
//使用基础数据类型创建DataSet
val a = Seq(1,2,3).toDS()
//使用DataSet
a.map(_+1).collect.foreach(println)
a.show()
//使用样例类创建DS
val b = Seq(Student("tom",22)).toDS()
b.show()
//通过导入文件创建,并使用样例类指定DS的格式
val path = "C:\\Users\\Administrator\\Desktop\\student_data.txt"
val c = sparkSession.read.json(path).as[Student]
c.show()
//由于是强类型,所以这里可以很方便的操作ds中的内容
c.foreach(x=>println(x.age))
}
}
student_data.txt
{"name":"张一","age":10,"address":"国际庄"}
{"name":"张二","age":20}
{"name":"张三","age":30}
{"name":"张四","age":40}
七.各个数据集的对比分析
- spark数据集
- RDD
- DataFrames
- DataSet
- 相同点
- 全都是spark平台下的分布式弹性数据集,为处理超大型数据提供便利
- 三者都是惰性机制,只有遇到Action算子时才会提交作业
- 有许多共同的函数,如map、filter、sort等。
- 不同点
- 每个数据类型的应用场景
- RDD
- DataFrames(必须使用)
- R或是python语言开发者,使用DF
- DataSet()必须使用)
- 使用DF、DS场景
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。