累加器原理图:
累加器创建:
sc.longaccumulator("") sc.longaccumulator
sc.collectionaccumulator() sc.collectionaccumulator
sc.doubleaccumulator() sc.doubleaccumulator
累加器累加:
l.add(1L)
累加器结果获取:
l.value
demo
long累加器
spark.sparkContext.setLogLevel("error")
val data=spark.sparkContext.parallelize(List(" "," "," "," "))
//var l=spark.sparkContext.longAccumulator
var l=spark.sparkContext.longAccumulator("test")
data.map(x=>{
l.add(3L)
x
}).count()
//count 函数仅仅用于触发执行
println(l.value)
data循环4次,每次加3,输出结果为12
Double累加器
collection累加器
重复累计问题:
val data=spark.sparkContext.parallelize(List(" "," "," "," ")) //var l=spark.sparkContext.longAccumulator var l=spark.sparkContext.longAccumulator("test") val res=data.map(x=>{ l.add(3L) x }) res.count() //count 函数仅仅用于触发执行 println(l.value) res.collect() println(l.value)
连续两次调用了action算子,所以这里累加器进行了两次重复的累加,也就是说,累加器实在遇到action算子的时候才进行累加操作的
正确写法在累加器结束后加入cache
spark.sparkContext.setLogLevel("error") val data=spark.sparkContext.parallelize(List(" "," "," "," ")) //var l=spark.sparkContext.longAccumulator var l=spark.sparkContext.longAccumulator("test") val res=data.map(x=>{ l.add(3L) x }).cache() res.count() //count 函数仅仅用于触发执行 println(l.value) res.collect() println(l.value)
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。