微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

idea开发SparkSQL程序

首先导入maven依赖

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>

 

 

dataframe

package sparksql

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}


object Demo1 {

  //创建case类
  case class User(name:String,age:Int)

  def main(args: Array[String]): Unit = {
    //创建SparkConf()并设置App名称
    val spark = SparkSession
      .builder()
      .appName("Spark sql basic example")
      .config("spark.some.config.option", "some-value")
      .master("local[*]")
      .getorCreate()
//隐式转换
    import spark.implicits._

    val raw: RDD[(String, Int)] = spark.sparkContext.makeRDD(List(("zhangsan", 21), ("lisi", 22), ("wangwu", 23)))

    val df: DataFrame = raw.toDF("name", "age")

    df.show()

    //创建表格
    df.createOrReplaceTempView("user")

    //执行sql语句
    val selectUser: DataFrame = spark.sql("select * from user where age > 21")

    selectUser.show()

    //转换成dataset
    val ds: Dataset[User] = df.as[User]

    //转换回rdd
    val rdd: RDD[Row] = df.rdd

    //遍历该rdd
    for(row <- rdd){
     println(row.getString(0))
     println(row.getInt(1))
    }

    spark.stop()
  }

}

注意:

1)sparksession的创建不能用new SparkSession的方式,而是利用伴生对象SparkSession来创建builder,通过builder来创建sparksession。

2)隐式转换import spark.implicits._不是引入了一个包,spark指的是程序上下文环境中的sparksession对象,所以这里引入了该对象的implicitis方法,_指代该方法的参数。如果该对象改名为sparksession,相应的隐式转换语句变为import sparksession.implicitis._

3)row对象的get方法的下标索引是从0开始,而不像jdbc的resultset下标索引从1开始

 

DataSet

 

package sparksql

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}


object Demo2 {

  //创建case类
  case class People(name:String, age:Int)

  def main(args: Array[String]): Unit = {

    //创建SparkConf()并设置App名称
    val spark = SparkSession
      .builder()
      .appName("Spark sql basic example")
      .config("spark.some.config.option", "some-value")
      .master("local[*]")
      .getorCreate()

    //隐式转换
    import spark.implicits._

    //创建dataset
    val peopleDataset = Seq(People("zhangsan",20),People("lisi",21),People("wangwu",22)).toDS()

    //转换成dataframe
    val peopleDataframe: DataFrame = peopleDataset.toDF()

    peopleDataframe.show()

    //转换成rdd
    val rdd: RDD[People] = peopleDataset.rdd

    //遍历该rdd
    for(people <- rdd){
      println(people.name+"\t"+people.age)
    }

    spark.stop()
  }
}

 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐