微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

基于spark2的dataFrame和dataSet


文章目录


dataFrame

package sql2

import org.apache.avro.generic.GenericData.StringType
import org.apache.spark.sql.types.{LongType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession, types}

object Spark2DateFrame {
  def main(args: Array[String]): Unit = {
    val session = SparkSession.builder()
      .appName("sqlTest01")
      .master("local[*]")
      .getorCreate()
//    创建RDD
    val lines = session.sparkContext.textFile("")
//    将数据进行整理
    val rowRDD = lines.map(line => {
      val fields = line.split(",")
      val id = fields(0).toLong
      val name = fields(1)
      Row(id, name)
    })
//    结果类型,其实就是表头,用于描述DataFrame,true表示是否为空
    val sch = StructType(List(
      StructField("id", LongType, true),
      StructField("name", StringType, true)
    ))

    //创建DataFrame
    val df = session.createDataFrame(rowRDD,sch)

    import session.implicits._
    val df2 = df.where($"id">8).orderBy($"id" desc,$"age" asc)

//    df.show()
    //写入成csv文件.json,parquet,jdbc等
    df2.write.parquet("")
    session.stop()

  }
}

dataFrame wordCount

package sql2

import org.apache.spark.sql.SparkSession

object Spark2WoldCount {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .appName("Spark2WoldCount")
      .master("local[*]")
      .getorCreate()
    val lines = spark.read.textFile("")

    //整理数据,压平.导入隐式转换
    import spark.implicits._
    val words = lines.flatMap(_.split(" "))

    //注册视图
    words.createTempView("v_wc")

    //执行sql
    val result = spark.sql("select value,COUNT(*) counts from v_wc GROUP BY value ORDER BY counts DESC")
    result.show()
    spark.stop()
  }
}

基于dataSet的wordCount

package sql2

import org.apache.avro.generic.GenericData.StringType
import org.apache.spark.sql.types.{LongType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession, types}

object Spark2DateFrame {
  def main(args: Array[String]): Unit = {
    val session = SparkSession.builder()
      .appName("sqlTest01")
      .master("local[*]")
      .getorCreate()
//    创建RDD
    val lines = session.sparkContext.textFile("")
//    将数据进行整理
    val rowRDD = lines.map(line => {
      val fields = line.split(",")
      val id = fields(0).toLong
      val name = fields(1)
      Row(id, name)
    })
//    结果类型,其实就是表头,用于描述DataFrame,true表示是否为空
    val sch = StructType(List(
      StructField("id", LongType, true),
      StructField("name", StringType, true)
    ))

    //创建DataFrame
    val df = session.createDataFrame(rowRDD,sch)

    import session.implicits._
    val df2 = df.where($"id">8).orderBy($"id" desc,$"age" asc)

//    df.show()
    //写入成csv文件.json,parquet,jdbc等
    df2.write.parquet("")
    session.stop()

  }
}

               

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐