Spark对数据的读入和写出操作
数据存储在文件中
val spark = SparkSession.builder()
.master("local[6]")
.appName("reader1")
.getorCreate()
CSV类型文件
简单介绍:逗号分隔值(Comma-Separated Values,CSV,有时也称为字符分隔值,因为分隔字符也可以不是逗号),其文件以纯文本形式存储表格数据(数字和文本)
spark.read
.format("csv")
.load("data/BeijingPM20100101_20151231.csv")
.show()
spark.read
.csv("data/BeijingPM20100101_20151231.csv")
.show()
spark.read
.option("header", true)
.option("inferSchema", true)
.csv("data/BeijingPM20100101_20151231.csv")
.show()
spark.read
.option("header", true)
.option("inferSchema", true)
.csv("data/BeijingPM20100101_20151231.csv")
.show()
JSON类型文件
- json数据一般都是用来存储对象数据
- json格式的数据文件读取和csv很相似,具体如下所示:
df.write.json("data/Beijing_pm2.json")
df.write.format("json").save("data/Beijing_pm2.json")
val df = spark.read
.option("header",true)
.csv("data/BeijingPM20100101_20151231.csv")
.toJSON
Parquet操作
val df = spark.read.option("header", true).csv("data/BeijingPM20100101_20151231.csv")
//将数据写为parquet格式
df.write
.mode(SaveMode.Overwrite)
.format("parquet")
.save("data/beijing_pm3")
-
我们发现在保存parquet文件时与之前相比多了一个
mode
,这是为了预防parquet保存路径已经有其他的文件存在,他一共有四种模式分别是:报错、重写、忽略和追加,默认时报错 -
parquet文件保存并不是一个文件,而是存为一个文件夹,如下图所示
-
那我们该如何读取parquet类型的文件呢?
spark.read
.parquet("data/beijing_pm3")
分区操作
分区就是将数据以某一列为标准来进行分类,每一个类都是存为一个文件夹
- 分区操作的读写操作如下所示
val df = spark.read
.option("header", true)
.csv("data/BeijingPM20100101_20151231.csv")
//写文件,表分区
df.write
.partitionBy("year", "month")
.save("data/beijing_pm4")
//读文件,自动发现分区
spark.read
.parquet("data/beijing_pm4")
.printSchema()
partitionBy
时标注以那一列来进行分区,分区的效果如下所示
- 在读取分区表的时候不仅仅可以读取整表,还可以细分到某一个分区内,如读取2010年1月的数据,但是这样就会导致列的减少,会少年和月两列
spark.read
.parquet("data/beijing_pm4/year=2010/month=1")
.show()
数据存储在Hive表中
具体代码如下
//1、创建SparkSession
val spark = SparkSession.builder()
.appName("Hive")
.enableHiveSupport()
//填写hive有关的一些信息(端口号、数据库地址等)
.config("hive.metastore.uris", "a://node1:9000")
.config("spark.sql.warehouse.dir", "/dataset/hive")
.getorCreate()
//读取数据
import spark.implicits._
//hive表的每一列的数据格式
val schema = StructType {
List(
StructField("name", StringType),
StructField("age", IntegerType),
StructField("gpa", FloatType)
)
}
//读取hdfs上的数据存入到hive中
val df = spark.read
//分割符时制表符
.option("delimited", "\t")
//文件的schema就是上面定义的
.schema(schema)
//文件在hdfs中的路径
.csv("hdfs:///dataset/student.csv")
val result = df.where('age > 50)
//写入数据
result.write.mode(SaveMode.Overwrite).saveAsTable("spark.student")
数据存储在MysqL中
//1、创建SparkSession
val spark = SparkSession.builder()
.appName("MysqL")
.master("local[6]")
.getorCreate()
//读取数据
import spark.implicits._
val schema = StructType {
List(
StructField("name", StringType),
StructField("age", IntegerType),
StructField("gpa", FloatType)
)
}
val df = spark.read
.option("delimiter", "\t")
.schema(schema)
.csv("hdfs:///dataset/student.csv")
val result = df.where('age > 50)
//写入数据
result.write
.format("jdbc")
.option("url", "jdbc:MysqL://node1:3306/spark")
.option("dbtable","student")
.option("user","spark")
.option("password","123456")
.mode(SaveMode.Overwrite)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。