微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

spark_sql

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-bBx0fy0y-1611723184476)(C:\Users\14112\AppData\Roaming\Typora\typora-user-images\image-20210126202837264.png)]

$“age”+1,'age+1,column(“age”)+1,col(“age”)

中$,`,col,和colum等价

DSL

3.4.1.1 第一步:创建文本文件

在linux的/export/servers/路径下创建文本文件

cd /export/servers/

vim person.txt

写入文件

1 zhangsan 20

2 lisi 29

3 wangwu 25

4 zhaoliu 30

5 tianqi 35

6 kobe 40

3.4.1.2 第二步:定义RDD

使用spark-shell 进入spark客户端

cd /export/servers/spark-2.2.0-bin-2.6.0-cdh5.14.0/

bin/spark-shell --master local[2]

val lineRDD = sc.textFile(“file:///export/servers/person.txt”).map(x => x.split(" "))

3.4.1.3 第三步:定义case class样例类

case class Person(id:Int,name:String,age:Int)

3.4.1.4 第四步:关联RDD与case class

val personRDD = lineRDD.map(x => Person(x(0).toInt,x(1),x(2).toInt))

3.4.1.5 第五步:将RDD转换成DF

val personDF = personRDD.toDF

注意:DF也可以转换成为RDD,直接使用DF调用rdd方法即可

scala> personDF.rdd.collect

res38: Array[org.apache.spark.sql.Row] = Array([1,zhangsan,20], [2,lisi,29], [3,wangwu,25], [4,zhaoliu,30], [5,tianqi,35], [6,kobe,40])

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-PUWmnwKv-1611723184478)(C:\Users\14112\AppData\Roaming\Typora\typora-user-images\image-20210127125124494.png)]

  • DataSet:DataFrame+泛型

  • 问题:如果现在需要单词切分,String类型的数据才可以切分,如果DataFrame不知道泛型,不知道是Person,String

  • 案例:使用两种API实现WordCount

  • 需求:

  • 步骤:

    • 1-准备SparkSession环境
    • 2-读取数据
    • 3-扁平化数据FlatMap
    • 4-执行统计
package cn.itcast.sparksql

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.functions._

/**
 * DESC:
 * 1-准备SparkSession环境
 * 2-读取数据
 * 3-扁平化数据FlatMap
 * 4-执行统计
 */
object _10DSLsqlWordCount {
  def main(args: Array[String]): Unit = {
    //1-准备SparkSession环境
    val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")
    val spark: SparkSession = SparkSession.builder()
      .config(conf)
      .getorCreate()
    val sc: SparkContext = spark.sparkContext
    sc.setLogLevel("WARN")
    import spark.implicits._
    //2-读取数据
    //val valueRDD: RDD[String] = sc.textFile("data/input/words.txt")
    val df: DataFrame = spark.read.text("data/input/words.txt")
    //spark.read.textFile("data/input/words.txt")
    //3-扁平化数据FlatMap
    val dataDS: Dataset[String] = df.as[String]
    val strDS: Dataset[String] = dataDS.flatMap(x => x.split("\\s+"))
    //4-执行统计'---------DSL
    println("======================DSL-=========================")
    strDS.printSchema()
    //root
    //|-- value: string (nullable = true)
    strDS.groupBy($"value")
      .count()
      .orderBy($"count".desc)
      .limit(5)
      .show()
    //
    strDS.groupBy($"value")
      .count()
      .sort($"count".desc)
      .limit(5)
      .show()
    //
    strDS.groupBy($"value")
      .agg(count("value").alias("count"))
      .sort($"count".desc)
      .limit(5)
      .show()
    //
    strDS.groupBy($"value")
      //Map(
      //*"age" -> "max",
      //*"expense" -> "sum"
      //*)
      .agg(Map("value" -> "count"))
      .sort($"count(value)".desc)
      .limit(5)
      .show()
    //sql
    println("======================sql-=========================")
    strDS.createOrReplaceTempView("table_view")
    spark.sql(
      """
        |select value,count(value) as count
        |from table_view
        |group by value
        |order by count desc
        |limit  5
        |""".stripMargin).show()

    spark.stop()
  }
}

RDD-DF-DS之间装换

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

/**

  • DESC:
    */
    case class People2(id: Int, name: String, age: Int)

object 09RddDFDS {
def main(args: Array[String]): Unit = {
//1-准备SparkSession
val conf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster(“local[*]”)
val spark: SparkSession = SparkSession.builder()
.config(conf)
.getorCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel(“WARN”)
//2-读取数据文件
val rdd1: RDD[String] = sc.textFile(“data/input/sql/people1.txt”)
//3-转化数据为DataFrame
val peopleRDD: RDD[People2] = rdd1.map(
.split("\s+")).map(filed => People2(filed(0).toInt, filed(1), filed(2).toInt))
//这里在RDD转换到DF过程中需要引入隐式转换
import spark.implicits._
//1-从RDD转化为DF----4种
val peopleDF: DataFrame = peopleRDD.toDF()
//2-从DF转RDD
peopleDF.rdd.collect().foreach(println())
//[1,zhangsan,20]
//[2,lisi,29]
//3-RDD转DS
val dataDS: Dataset[People2] = peopleRDD.toDS()
//±–±-------±–+
//| id| name|age|
//±–±-------±–+
//| 1|zhangsan| 20|
//| 2| lisi| 29|
//| 3| wangwu| 25|
//4-DS转RDD
dataDS.rdd.collect().foreach(println(
))
//People2(1,zhangsan,20)
//People2(2,lisi,29)
//People2(3,wangwu,25)
//5-DF-DS
val peopleDS: Dataset[People2] = peopleDF.as[People2]
peopleDS.show()
//6-DS-DF
peopleDS.toDF().show()

//4-关闭SparkSession
spark.close()

}
}

UDF编程

本质不难:也就是一个函数,实现一个功能:一对一的关系,类似于map作用

使用的步骤:

通过session创建udf.register(函数名称,函数中每个变量:变量的类型)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0WNH7j5v-1611723184480)(C:\Users\14112\AppData\Roaming\Typora\typora-user-images\image-20210127123539258.png)]

有多少个变量 就选择选择哪个udf多少,例如有两个变量就选择UDF2

spark.udf.register()这是固定格式

//通过new的方式写函数

spark.udf.register("smalltoupper2", new UDF1[String,String] {
  override def call(t1: String): String =
    t1.toupperCase()
},DataTypes.StringType)

//通过lambam表达式

//2创建lamba表达式函数
spark.udf.register("smalltoupper1",(a1:String)=>{
  a1.toupperCase()
})
spark.udf.register("smalltoupper2", new UDF1[String,String] {
  override def call(t1: String): String =
    t1.toupperCase()
},DataTypes.StringType)

//通过lambam表达式

//2创建lamba表达式函数
spark.udf.register("smalltoupper1",(a1:String)=>{
  a1.toupperCase()
})

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐