微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

大数据开发中如何进行Spark闭包的理解分析

这篇文章将为大家详细讲解有关大数据开发中如何进行Spark闭包的理解分析,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。

1.从Scala中理解闭包

闭包是一个函数,返回值依赖于声明在函数外部的一个或多个变量。闭包通常来讲可以简单的认为是可以访问一个函数里面局部变量的另外一个函数

如下面这段匿名的函数

val multiplier = (i:Int) => i * 10

函数体内有一个变量 i,它作为函数一个参数。如下面的另一段代码

val multiplier = (i:Int) => i * factor

multiplier 中有两个变量:i 和 factor。其中的一个 i 是函数的形式参数,在 multiplier 函数调用时,i 被赋予一个新的值。然而,factor不是形式参数,而是自由变量,考虑下面代码

var factor = 3  val multiplier = (i:Int) => i * factor

这里我们引入一个自由变量 factor,这个变量定义在函数外面。

这样定义的函数变量 multiplier 成为一个"闭包",因为它引用到函数外面定义的变量,定义这个函数的过程是将这个自由变量捕获而构成一个封闭的函数

完整的例子:

object Test {  
   def main(args: Array[String]) {  
      println( "muliplier(1) value = " +  multiplier(1) )  
      println( "muliplier(2) value = " +  multiplier(2) )  
   }  
   var factor = 3  
   val multiplier = (i:Int) => i * factor  
}

2.Spark中的闭包理解

先来看下面一段代码

val data=Array(1, 2, 3, 4, 5)
var counter = 0
var rdd = sc.parallelize(data)

// ???? 这样做会怎么样
rdd.foreach(x => counter += x)

println("Counter value: " + counter)

首先肯定的是上面输出的结果是0,park将RDD操作的处理分解为tasks,每个task由Executor执行。在执行之前,Spark会计算task的闭包。闭包是Executor在RDD上进行计算的时候必须可见的那些变量和方法在这种情况下是foreach())。闭包会被序列化并发送给每个Executor,但是发送给Executor的是副本,所以在Driver上输出的依然是counter本身,如果想对全局的进行更新,用累加器,在spark-streaming里面使用updateStateByKey来更新公共的状态。

另外在Spark中的闭包还有别的作用,

1.清除Driver发送到Executor上的无用的全局变量等,只复制有用的变量信息给Executor

2.保证发送到Executor上的是序列化以后的数据

比如在使用DataSet时候 case class的定义必须在类下,而不能是方法内,即使语法上没问题,如果使用过json4s来序列化,implicit val formats = DefaultFormats 的引入最好放在类下,否则要单独将这个format序列化,即使你没有使用到它别的东西。

关于大数据开发中如何进行Spark闭包的理解分析就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐