微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Spark RDD算子之foreachPartition

首先,看如下代码

ds.foreachRDD(
    rdd =>{
        // 此处属于rdd外,在driver端执行
        //driver和executor数据传输需要序列化
        rdd.foreach{
            // rdd里面,在executor执行
            case( (a,b) =>{
                val conn: Connection = JDBCUtil.getConnection
                conn.close()
            })
        }
    }
)

在如上代码情况下,rdd中每一条数据处理时都会创建连接,有问题。

但是如果放在foreach外面,因为foreach是RDD的算子,算子之外的代码是在Driver端执行,算子内的代码是在Executor端执行,这样涉及闭包操作。这样需要将Driver端的数据传递到Executor端,需要将数据序列化。但是数据库的连接对象是不能被序列化的。

 

此时可以用foreachPartition。

foreachPartition算子会以一个分区为单位进行数据处理。

代码改为如下:

ds.foreachRDD(
      rdd => {
        rdd.foreachPartition(
          iter => {
            val conn = JDBCUtil.getConnection
            iter.foreach {
              case (a,b) => {

              }
            }
            conn.close()
          }
        )
    }
)

此时foreachPartition在每一个分区创建一个数据库连接,性能得到提高。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐