Spark sql
SchemaRDD/DataFrame
介绍
- 用于结构化数据
Spark sql运行原理 Catalyst
- 优化器:将逻辑计划转化成物理计划
逻辑计划:全表扫描——>投影——>过滤——>投影
优化:减少资源的使用,提高查询的效率
1)投影上检查是否有过滤器是否下压:
2)全表扫描——>过滤——>投影——>投影
物理计划:全表扫描——>过滤——>投影
Spark sql API-1重点
SparkContext
- Spark sql的编程入口
SparkSession
val spark = SparkSession.builder .master("master") .appName("appName")
.getorCreate()
Dataset
- 存储一些强类型的集合
Dataset=RDD+Schema
===========================================================
例子一:创建Dataset
scala> val cc=Seq(("SS",12),("CC",33)).toDF("name","age")
scala> cc.show
+----+---+
|name|age|
+----+---+
| SS| 12|
| CC| 33|
+----+---+
例子二:创建Dataset
val ds=spark.createDataset(1 to 10)
val ds=spark.createDataset(List(("a"->1)))
ds.show()//表结构
ds.where(ds("_2")>10).show
============================================================
例子二:运用样式类创建dataset
case class student(name:String,age:Int)
val rdd=sc.makeRDD(List(student("jc",19),student("jk",17)))
val ds1=rdd.toDS//转成DS
ds1.where(ds1("age")===19).show
===========================================================
scala> case class Order(id:String,customerId:String)
scala> case class OrderItem(orderid:String,subTotal:Double)
scala> val ordersRDD=sc.textFile("/text/orders.csv")
scala> val orderDS=ordersRDD.map(line=>{val cols=line.split(",");Order(cols(0),cols(2))}).toDS
scala> orderDS.show(2)
+---+----------+
| id|customerId|
+---+----------+
| 1| 11599|
| 2| 256|
+---+----------+
Spark sql API-4 DataFrame数据表格
import org.apache.spark.sql.Row
val row=Row(1,1.0,"abc")
row.get(0)
Spark sql API-6 创建DataFrame
val df=spark.read.json("/text/users.json")
df.show
df.printSchema()//输出数据结构信息
DataFrame中的数据结构信息,即为schema。
- 将json文件转成DataFrame
方法一:关键spark.read.json
scala> val df=spark.read.json("/text/users.json")
scala> df.show
+----+-------+
| Age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
scala> df.select("name").show
scala> df.select(col("name")).show
scala> df.select(column("name")).show
scala> df.select($"name").show
scala> df.select('name).show
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
//首先注册成一张表(运用临时表:registerTempTable)
scala> df.registerTempTable("Test")
//使用sql语句查询
scala> spark.sql("select * from Test").show
+----+-------+
| Age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
withColumn:常用
scala> df.withColumn("Age11",col("Age")+1).show
+----+-------+-----+
| Age| name|Age11|
+----+-------+-----+
|null|Michael| null|
| 30| Andy| 31|
| 19| Justin| 20|
+----+-------+-----+
—————————————————————————————————————————————————————————————————
//数据自己写方法
scala> spark.sql("select explode(array(1,2,3)) as name").show
+----+
|name|
+----+
| 1|
| 2|
| 3|
+----+
Spark sql API-8
- RDD->DataFrame
- 关键是将文件toDF,然后再进行show
scala> import org.apache.spark.sql.types._
scala> val dfs2=Seq(("js",19),("jk",18)).toDF("name","age")
dfs2: org.apache.spark.sql.DataFrame = [name: string, age: int]
scala> dfs2.show
+----+---+
|name|age|
+----+---+
| js| 19|
| jk| 18|
+----+---+
scala> val df3=Seq(1 to 10 :_*).toDF
df3: org.apache.spark.sql.DataFrame = [value: int]
scala> df3.show
+-----+
|value|
+-----+
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
| 10|
+-----+
//转成rdd
case class student(name:String,age:Int)
scala> dfs2.as[student].rdd
res39: org.apache.spark.rdd.RDD[student] = MapPartitionsRDD[84] at rdd at <console>:32
Spark sql操作外部数据源-1
//将文件存到hdfs
scala> val df=spark.read.json("/text/users.json")
df.write.save("/data/20190801")
df.write.parquet("/data/20190801")
Spark sql操作外部数据源-3
- Hive 元数据存储MysqL:对于数据的直接访问
1)打开Hive metastore
[root@head42 ~]# hive --service metastore &
netstat -ano|grep 9083 ???
2)开启spark连接MysqL
[root@head42 ~]# spark-shell --conf spark.hadoop.hive.metastore.uris=thrift://localhost:9083
3)scala> spark.sql("show tables").show
spark.sql("select * from database_name.table_name")//访问其他数据库
+--------+--------------+-----------+
|database| tableName|istemporary|
+--------+--------------+-----------+
| default| customer| false|
| default|text_customers| false|
+--------+--------------+-----------+
这样就Ok了!
在scala中查看hive表数据
hive> create table toronto(
> full_name string,
> ssn string,
> office_address string);
hive> insert into toronto(full_name,ssn,office_address)values("jc","ssss","1111");
scala> spark.sql("use databases")
val df=spark.table("hive_table")
df.printSchema
df.show
Spark sql操作外部数据源-4??
- RDBMS表
val url = "jdbc:MysqL://localhost:3306/MysqL"
val tableName = "TBLS"
// 设置连接用户、密码、数据库驱动类
val prop = new java.util.Properties
prop.setProperty("user","hive")
prop.setProperty("password","mypassword")
prop.setProperty("driver","com.MysqL.jdbc.Driver")
// 取得该表数据
val jdbcDF = spark.read.jdbc(url,tableName,prop)
jdbcDF.show
//DF存为新的表
jdbcDF.write.mode("append").jdbc(url,"t1",prop)
函数的调用方法一:
scala> import org.apache.spark.sql.functions
scala> val lower=functions.udf((x:String)=>{x.toLowerCase})//转换为小写
scala> val df=Seq("ABc","DEF").toDF("name")
scala> df.show
+----+
|name|
+----+
| ABc|
| DEF|
+----+
scala> df.select(lower(df("name"))).show
+---------+
|UDF(name)|
+---------+
| abc|
| def|
+---------+
方法二
scala> import org.apache.spark.sql.SparkSession
scala> spark.udf.register("upper",(x:String)=>{x.toupperCase})
scala> spark.sql("""select upper("abc")""").show
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。