微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

spark foreachPartition foreach

1.foreach

    val list = new ArrayBuffer()
    myRdd.foreach(record => {
      list += record
    })

2.foreachPartition

    val list = new ArrayBuffer
    rdd.foreachPartition(it => {
      It.foreach(r => {
        list += r
      })
    })

说明:

foreachPartition属于算子操作,可以提高模型效率。比如在使用foreach时,将RDD中所有数据写Mongo中,就会一条数据一条数据地写,每次函数调用可能就会创建一个数据库连接,此时就势必会频繁地创建和销毁数据库连接,性能是非常低下;但是如果用foreachPartitions算子一次性处理一个partition的数据,那么对于每个partition,只要创建一个数据库连接即可,然后执行批量插入操作,此时性能是比较高的。

参考官网的说明:

https://spark.apache.org/docs/latest/streaming-programming-guide.html

 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐