1.说明
* 1.Rdd文件读取和保存 可以从两个角度来区分 * 文件格式 : text、json、csv、sequence文件、Object序列化文件 * 文件系统 : 本地文件系统、hdfs、hbase、各类型数据库
2.Spark中怎样读取&保存text文件?
1.读取
* 1.SparkContext.textFile * 语法 : * def textFile(path: String,minPartitions: Int = defaultMinPartitions) * : RDD[String] * 功能 : * 从hdfs、本地文件系统(任何节点可用),或者任何文件系统中读取数据 * 并返回 Rdd[String]对象(文本文件的每一行都会成为Rdd的一个元素) * 参数 : * path : 文件系统路径,可用是hdfs的路径(多路径时用,分割) * minPartitions : Rdd的最小分区数 * 返回值 : * 返回一个Rdd,文本文件的每一行都会成为Rdd的一个元素 * note : * 文本文件的编码必须是UTF-8 * *2.SparkContext.wholeTextFiles * 语法 : * def wholeTextFiles(path: String,minPartitions: Int = defaultMinPartitions) * : RDD[(String, String)] * 功能 : * 从hdfs、本地文件系统(任何节点可用),或者任何文件系统中读取数据,每个文件都会作为单个记录读取 * 并返回 RDD[(String, String)]对象(key:文件路径名称,value:文件内容) * 参数 : * path : 文件系统路径,可用是hdfs的路径(多路径时,用,分割) * minPartitions : Rdd的最小分区数 * 返回值 : * 返回一个Rdd,文本文件的每一行都会成为Rdd的一个元素
2.保存
* 1.rdd.saveAsTextFile * 语法 : * def saveAsTextFile(path: String): Unit = withScope * def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit * 功能 : * 将rdd保存为指定目录下的text * 参数 : * path : 文件系统路径,可用是hdfs的路径 * codec : 压缩算法实现类
3.示例
object textTest extends App { val sparkconf: SparkConf = new SparkConf().setMaster("local").setAppName("distinctTest") val sc: SparkContext = new SparkContext(sparkconf) //1.读取指定路径下的text文件,返回rdd,每一行为rdd的一个元素 private val rdd1: RDD[String] = sc.textFile("Spark_319/src/data/input2") //2.读取指定路径下的text文件,返回rdd,每个文件作为rdd的一个元素(key:文件路径,value:文件内容) private val rdd2: RDD[(String, String)] = sc.wholeTextFiles("Spark_319/src/data/input2") rdd1.collect().foreach(println(_)) println("***********************") rdd2.collect().foreach(println(_)) //3.保存rdd到指定目录 rdd1.saveAsTextFile("Spark_319/src/data/01") rdd2.saveAsTextFile("Spark_319/src/data/02") sc.stop() }
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。