一、累加器
1.累加器的用法
- 通过在驱动器中调用SparkContext.accumulator(initialValue)方法,创建出有初始值的累加器。返回值为org.apache.spark.Accumulator[T]对象,其中T是初始值initialValue的类型。
- Spark闭包里的执行器代码可以使用累加器的+=方法增加累加器的值。
- 驱动器程序可以调用累加器的value属性来访问累加器的值。
- 注意:工作节点上的任务不能访问累加器的值。累加器是一个只写变量。
2.累加器与容错性
- Spark会自动重新执行失败的或较慢的任务来应对有错误的或者比较慢的机器。
- 对于要在行动操作中使用的累加器,spark只会把每个任务对各累加器的修改应用一次。因此,如果想要一个无论在失败还是重复计算时都绝对可靠的累加器,我们必须把它放在foreach()这样的行动操作中。
- 对于在RDD转化操作中使用的累加器,就不能保证有这种情况了。转化操作中可能会重新更新累加器,并再次发送到驱动器中。
二、广播变量
- Spark会自动把闭包中所有引用到的变量发送到工作节点上。虽然很方便,但也低效。
1.首先,默认的任务发射机制是专门为小任务进行优化的;
2.其次,事实上你可能会在多个并行操作中使用同一个变量。但是Spark会为每个操作分别发送。
- spark广播变量可以让程序高效的向所有节点发送一个较大的只读值,以供一个或多个Spark使用。
- 通过对一个类型T的对象调用SparkContext.broadcast创建出一个broadcast[T]对象。任何可序列化的类型都可以这么实现。
- 通过value属性访问该对象的值。
- 变量会被发送到各节点一次,应做只读处理。
1.广播的优化
- 当广播一个比较大的值时,选择既快又好的序列化格式是很重要的,因为如果序列化对象的时间很长或者传送花费的时间太久,这段时间就很容易称为性能瓶颈。
- spark的scala和java默认使用Java序列化库,它对除基本类型外的任何对象都比较低效。
- 可以选择使用spark.serializer属性选择另一个序列化库来优化序列化过程。
三、基于分区进行操作
- 基于分区对数据进行操作可以让我们避免为每个数据元素进行重复的配置工作。诸如打开数据库连接或创建随机数生成器等操作,都是我们应当尽量避免为每个元素都配置一次的工作。
- spark提供基于分区的map和foreach,让你的部分代码只对RDD的每个分区运行一次,这样就可以帮助降低这些操作的代价。
- 在scala中使用共享连接池与JSON解析器
val contactsContactLists = validSigns.distinct().mapPartitions{
signs =>
val mapper = createMapper()
val client = new HttpClient()
//创建http请求
client.start()
signs.map{
//获取响应
createExchangeForSign(sign)
}.map{case (sign,exchange) => (sign,readExchangeCallLog(mapper,exchange))}.filter(x => x._2 != null)//删除空的呼叫日志
}
- 按分区执行的操作符
mapPartitions() f:(Iterator[T]) -> Iterator[U]
mapPartitionsWithIndex() f:(Int,Iterator[T]) -> Iterator[U]
foreachPartitions() f:(Iterator[T]) -> Iterator[U]
四、与外部程序间的管道
五、数值RDD的操作
- spark的数值操作是通过流式算法实现的,允许以每次一个元素的方式构建出模型。这些统计数据都会在调用stats()时通过一次遍历数据计算出来,并以StatsCounter对象返回。
- StatsCounter中可用的汇总统计数据
方法 | 含义 |
---|---|
count() | RDD中元素个数 |
mean() | 元素的平均值 |
sum() | 总和 |
max() | 最大值 |
min() | 最小值 |
variance() | 元素的方差 |
sampleVariance() | 从采样中计算出的方差 |
stdev() | 标准差 |
sampleStdev() | 采样的标准差 |
variance() | 元素的方差 |
- 例:移除异常值
val distanceDouble = distance.map(string => string.todouble)
val stats = distanceDoubles.stats()
val stddev = stats.stdev
val mean = stats.mean
val reasonabledistances = distanceDoubles.filter(x => math.abs(x-mean) < 3 * stddev)
println(reasonabledistances.collect().toList)
扫码关注公众号
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。