首先导入maven依赖
<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.1.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.1.1</version> </dependency>
dataframe
package sparksql import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} object Demo1 { //创建case类 case class User(name:String,age:Int) def main(args: Array[String]): Unit = { //创建SparkConf()并设置App名称 val spark = SparkSession .builder() .appName("Spark sql basic example") .config("spark.some.config.option", "some-value") .master("local[*]") .getorCreate() //隐式转换 import spark.implicits._ val raw: RDD[(String, Int)] = spark.sparkContext.makeRDD(List(("zhangsan", 21), ("lisi", 22), ("wangwu", 23))) val df: DataFrame = raw.toDF("name", "age") df.show() //创建表格 df.createOrReplaceTempView("user") //执行sql语句 val selectUser: DataFrame = spark.sql("select * from user where age > 21") selectUser.show() //转换成dataset val ds: Dataset[User] = df.as[User] //转换回rdd val rdd: RDD[Row] = df.rdd //遍历该rdd for(row <- rdd){ println(row.getString(0)) println(row.getInt(1)) } spark.stop() } }
注意:
1)sparksession的创建不能用new SparkSession的方式,而是利用伴生对象SparkSession来创建builder,通过builder来创建sparksession。
2)隐式转换import spark.implicits._不是引入了一个包,spark指的是程序上下文环境中的sparksession对象,所以这里引入了该对象的implicitis方法,_指代该方法的参数。如果该对象改名为sparksession,相应的隐式转换语句变为import sparksession.implicitis._
3)row对象的get方法的下标索引是从0开始,而不像jdbc的resultset下标索引从1开始。
DataSet
package sparksql import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, SparkSession} object Demo2 { //创建case类 case class People(name:String, age:Int) def main(args: Array[String]): Unit = { //创建SparkConf()并设置App名称 val spark = SparkSession .builder() .appName("Spark sql basic example") .config("spark.some.config.option", "some-value") .master("local[*]") .getorCreate() //隐式转换 import spark.implicits._ //创建dataset val peopleDataset = Seq(People("zhangsan",20),People("lisi",21),People("wangwu",22)).toDS() //转换成dataframe val peopleDataframe: DataFrame = peopleDataset.toDF() peopleDataframe.show() //转换成rdd val rdd: RDD[People] = peopleDataset.rdd //遍历该rdd for(people <- rdd){ println(people.name+"\t"+people.age) } spark.stop() } }
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。