微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

java-如何在扩展中存储输出

我正在尝试将管道输出到不同的目录,以使每个目录的输出将基于某些ID进行存储.
因此,在普通地图精简代码中,我将使用MultipleOutputs类,并在精简器中执行类似的操作.

protected void reduce(final SomeKey key,
      final Iterable<SomeValue> values,
      final Context context) {

   ...
   for (SomeValue value: values) {
     String bucketId = computeBucketIdFrom(...);
     multipleOutputs.write(key, value, folderName + "/" + bucketId);
   ...

所以我想一个人可以在烫伤中做到这一点

...
  val somePipe = Csv(in, separator = "\t",
        fields = someSchema,
        skipHeader = true)
    .read

  for (i <- 1 until numberOfBuckets) {
    somePipe
    .filter('someId) {id: String => (id.hashCode % numberOfBuckets) == i}
    .write(Csv(out + "/bucket" + i ,
      writeHeader = true,
      separator = "\t"))
  }

但是我认为您最终会多次重做同一个管道,这会影响整体性能.

还有其他选择吗?

谢谢

解决方法:

是的,使用TemplatedTsv当然是更好的方法.

因此,您的上述代码可以编写如下,

val somePipe = Tsv(in, fields = someSchema, skipHeader = true)
    .read
    .write(TemplatedTsv(out, "%s", 'some_id, writeHeader = true))

这会将来自’some_id的所有记录放入out / some_ids文件夹下的单独文件夹中.

但是,您也可以创建整数存储桶.只需更改最后几行,

.map('some_id -> 'bucket) { id: String => id.hashCode % numberOfBuckets }    
.write(TemplatedTsv(out, "%02d", 'bucket, writeHeader = true, fields = ('all except 'bucket)))

这将创建两个数字文件夹,如out / dd /.您还可以检查templatedTsv API here.

使用templatedTsv可能会有一个小问题,即reducer会生成很多小文件,这可能会对使用您的结果进行下一个工作不利.因此,最好在写入磁盘之前对模板字段进行排序.我写了一个关于它的博客here.

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐