文章目录
dataFrame
package sql2 import org.apache.avro.generic.GenericData.StringType import org.apache.spark.sql.types.{LongType, StructField, StructType} import org.apache.spark.sql.{Row, SparkSession, types} object Spark2DateFrame { def main(args: Array[String]): Unit = { val session = SparkSession.builder() .appName("sqlTest01") .master("local[*]") .getorCreate() // 创建RDD val lines = session.sparkContext.textFile("") // 将数据进行整理 val rowRDD = lines.map(line => { val fields = line.split(",") val id = fields(0).toLong val name = fields(1) Row(id, name) }) // 结果类型,其实就是表头,用于描述DataFrame,true表示是否为空 val sch = StructType(List( StructField("id", LongType, true), StructField("name", StringType, true) )) //创建DataFrame val df = session.createDataFrame(rowRDD,sch) import session.implicits._ val df2 = df.where($"id">8).orderBy($"id" desc,$"age" asc) // df.show() //写入成csv文件.json,parquet,jdbc等 df2.write.parquet("") session.stop() } }
dataFrame wordCount
package sql2 import org.apache.spark.sql.SparkSession object Spark2WoldCount { def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName("Spark2WoldCount") .master("local[*]") .getorCreate() val lines = spark.read.textFile("") //整理数据,压平.导入隐式转换 import spark.implicits._ val words = lines.flatMap(_.split(" ")) //注册视图 words.createTempView("v_wc") //执行sql val result = spark.sql("select value,COUNT(*) counts from v_wc GROUP BY value ORDER BY counts DESC") result.show() spark.stop() } }
基于dataSet的wordCount
package sql2 import org.apache.avro.generic.GenericData.StringType import org.apache.spark.sql.types.{LongType, StructField, StructType} import org.apache.spark.sql.{Row, SparkSession, types} object Spark2DateFrame { def main(args: Array[String]): Unit = { val session = SparkSession.builder() .appName("sqlTest01") .master("local[*]") .getorCreate() // 创建RDD val lines = session.sparkContext.textFile("") // 将数据进行整理 val rowRDD = lines.map(line => { val fields = line.split(",") val id = fields(0).toLong val name = fields(1) Row(id, name) }) // 结果类型,其实就是表头,用于描述DataFrame,true表示是否为空 val sch = StructType(List( StructField("id", LongType, true), StructField("name", StringType, true) )) //创建DataFrame val df = session.createDataFrame(rowRDD,sch) import session.implicits._ val df2 = df.where($"id">8).orderBy($"id" desc,$"age" asc) // df.show() //写入成csv文件.json,parquet,jdbc等 df2.write.parquet("") session.stop() } }
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。