微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

spark 累加器

累加器原理图:

 

 

 累加器创建:

sc.longaccumulator("")                sc.longaccumulator
sc.collectionaccumulator()            sc.collectionaccumulator
sc.doubleaccumulator()                sc.doubleaccumulator


累加器累加:

l.add(1L)

累加器结果获取

l.value

demo

long累加器

spark.sparkContext.setLogLevel("error")

val data=spark.sparkContext.parallelize(List(" "," "," "," "))
//var l=spark.sparkContext.longAccumulator
var l=spark.sparkContext.longAccumulator("test")
data.map(x=>{
l.add(3L)
x
}).count()
//count 函数仅仅用于触发执行
println(l.value)

data循环4次,每次加3,输出结果为12

Double累加器

collection累加器

 

 

 

 

重复累计问题:

val data=spark.sparkContext.parallelize(List(" "," "," "," "))
    //var l=spark.sparkContext.longAccumulator
    var l=spark.sparkContext.longAccumulator("test")
    val res=data.map(x=>{
      l.add(3L)
      x
    })
      res.count()
    //count 函数仅仅用于触发执行
    println(l.value)
    res.collect()
    println(l.value)

 

 连续两次调用了action算子,所以这里累加器进行了两次重复的累加,也就是说,累加器实在遇到action算子的时候才进行累加操作的

正确写法在累加器结束后加入cache

 spark.sparkContext.setLogLevel("error")

    val data=spark.sparkContext.parallelize(List(" "," "," "," "))
    //var l=spark.sparkContext.longAccumulator
    var l=spark.sparkContext.longAccumulator("test")
    val res=data.map(x=>{
      l.add(3L)
      x
    }).cache()
      res.count()
    //count 函数仅仅用于触发执行
    println(l.value)
    res.collect()
    println(l.value)

 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐