微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

使用Apache Spark和Java将CSV解析为DataFrame / DataSet

我是新手,我想要使用group-by& reduce从CSV中找到以下内容(使用一行):

  Department, Designation, costToCompany, State
  Sales, Trainee, 12000, UP
  Sales, Lead, 32000, AP
  Sales, Lead, 32000, LA
  Sales, Lead, 32000, TN
  Sales, Lead, 32000, AP
  Sales, Lead, 32000, TN 
  Sales, Lead, 32000, LA
  Sales, Lead, 32000, LA
  Marketing, Associate, 18000, TN
  Marketing, Associate, 18000, TN
  HR, Manager, 58000, TN

我想通过Department,Designation,State简化包含sum(costToCompany)和TotalEmployeeCount的其他列的CSV

应得到如下结果:

  Dept, Desg, state, empCount, totalCost
  Sales,Lead,AP,2,64000
  Sales,Lead,LA,3,96000  
  Sales,Lead,TN,2,64000

有没有办法使用转换和动作来实现这一点.或者我们应该进行RDD操作?

解决方法:

程序

>创建一个类(模式)来封装您的结构(方法B不需要它,但如果您使用Java,它将使您的代码更容易阅读)

public class Record implements Serializable {
  String department;
  String designation;
  long costToCompany;
  String state;
  // constructor , getters and setters  
}

>加载CVS(JSON)文件

JavaSparkContext sc;
JavaRDD<String> data = sc.textFile("path/input.csv");
//JavasqlContext sqlContext = new JavasqlContext(sc); // For prevIoUs versions 
sqlContext sqlContext = new sqlContext(sc); // In Spark 1.3 the Java API and Scala API have been unified


JavaRDD<Record> rdd_records = sc.textFile(data).map(
  new Function<String, Record>() {
      public Record call(String line) throws Exception {
         // Here you can use JSON
         // Gson gson = new Gson();
         // gson.fromJson(line, Record.class);
         String[] fields = line.split(",");
         Record sd = new Record(fields[0], fields[1], fields[2].trim(), fields[3]);
         return sd;
      }
});

此时您有两种方法

A. Sparksql

>注册表(使用您定义的Schema类)

JavaSchemaRDD table = sqlContext.applySchema(rdd_records, Record.class);
table.registerasTable("record_table");
table.printSchema();

>使用所需的Query-group-by查询表

JavaSchemaRDD res = sqlContext.sql("
  select department,designation,state,sum(costToCompany),count(*) 
  from record_table 
  group by department,designation,state
");

>在这里,您还可以使用sql方法执行您想要的任何其他查询

B.火花

>使用复合键进行映射:Department,Designation,State

JavaPairRDD<String, Tuple2<Long, Integer>> records_JPRDD = 
rdd_records.mapToPair(new
  PairFunction<Record, String, Tuple2<Long, Integer>>(){
    public Tuple2<String, Tuple2<Long, Integer>> call(Record record){
      Tuple2<String, Tuple2<Long, Integer>> t2 = 
      new Tuple2<String, Tuple2<Long,Integer>>(
        record.Department + record.Designation + record.State,
        new Tuple2<Long, Integer>(record.costToCompany,1)
      );
      return t2;
}

});
> reduceByKey使用复合键,汇总costToCompany列,并按键累计记录数

JavaPairRDD<String, Tuple2<Long, Integer>> final_rdd_records = 
 records_JPRDD.reduceByKey(new Function2<Tuple2<Long, Integer>, Tuple2<Long,
 Integer>, Tuple2<Long, Integer>>() {
    public Tuple2<Long, Integer> call(Tuple2<Long, Integer> v1,
    Tuple2<Long, Integer> v2) throws Exception {
        return new Tuple2<Long, Integer>(v1._1 + v2._1, v1._2+ v2._2);
    }
});

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐