微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

spark 1.6 和spark 2.0读写csv文件

如果是spark1.6.0请添加maven:

        <dependency>
            <groupId>com.databricks</groupId>
            <artifactId>spark-csv_2.10</artifactId>
            <version>1.4.0</version>
            <scope>compile</scope>
        </dependency>

package com.egridcloud.spark
import org.apache.spark.sql.{DataFrame, sqlContext}
import org.apache.spark.{SparkConf, SparkContext}
/**
  * Created by LHX on 2018/3/20 13:26.
  */
object SparkReadFile {
  def main(args: Array[String]): Unit = {
    val localpath="D:\\input\\word.csv"
    val outpath="D:\\output\\word2"
    val conf = new SparkConf()
    conf.setAppName("SparkReadFile")
    conf.setMaster("local")
    val sparkContext = new SparkContext(conf)
    val sqlContext = new sqlContext(sparkContext)
    //读csv文件
    val data: DataFrame = sqlContext.read.format("com.databricks.spark.csv")
      .option("header", "false") //在csv第一行有属性"true",没有就是"false"
      .option("inferSchema", true.toString) //这是自动推断属性列的数据类型
      .load(localpath)
//    data.show()
    // 写csv文件
    data.repartition(1).write.format("com.databricks.spark.csv")
      .option("header", "false")//在csv第一行有属性"true",没有就是"false"
      .option("delimiter",",")//认以","分割
      .save(outpath)
    sparkContext.stop()
  }
}

如果是spark2.0+就不用添加maven了,因为spark2.0内部集成了读写csv文件

val data = spark.read
.option("inferSchema", "true")
.option("header", "false") //这里设置是否处理头信息,false代表不处理,也就是说文件的第一行也会被加载进来,如果设置为true,那么加载进来的数据中不包含第一行,第一行被当作了头信息,也就是表中的字段名处理了
.csv(s"file:///home/spark/file/project/visit.txt")  //这里设置读取的文件,${i}是我引用的一个变量,如果要在双引号之间引用变量的话,括号前面的那个s不能少
.toDF("mac", "phone_brand", "enter_time", "first_time", "last_time", "region", "screen", "stay_time") //将读进来的数据转换为DF,并为每个字段设置字段名

 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐