微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Spark容错机制

Spark以前的集群容错处理模型,像MapReduce,将计算转换为一个有向无环图(DAG)的任务集合,这样可以通过重复执行DAG里的一部分任务来完成容错恢复。但是由于主要的数据存储在分布式文件系统中,没有提供其他存储的概念,容错过程需要在网络上进行数据复制,从而增加了大量的消耗。所以,分布式编程中经常需要做检查点,即将某个时机的中间数据写到存储(通常是分布式文件系统)中。

Lineage机制

RDD的lineage记录的是粗粒度的特定数据Transformation操作行为。当这个RDD的部分分区数据丢失时,它可以通过lineage获取足够的信息来重新运算和恢复丢失的数据分区。

RDD在lineage依赖方面分为两种:宽依赖和窄依赖来解决数据容错的高效性

·宽依赖:指父RDD的一个分区对应子RDD的多个分区(非全部),也可以父RDD的一个分区对应子RDD所有分区(未经协同划分的join)。

   

 

·窄依赖:父RDD一个分区一对应子RDD一个分区,或者父RDD多个分区对应子RDD一个分区(经过协同划分的join)

       

Spark依赖实现:

abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
    //返回子RDD的partitionId依赖的所有的parent RDD的Partition(s)
    def getParents(partitionId: Int): Seq[Int]
    override def rdd: RDD[T] = _rdd
}

 

窄依赖的one-to-one dependency实现:

class OnetoOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
    override def getParents(partitionId: Int) = List(partitionId)
}

窄依赖的many-to-one dependency实现:

override def getParents(partitionId: Int) = {
    if(partitionId >= outStart && partitionId < outStart + length) {
        List(partitionId - outStart + inStart)
    } else {
        Nil
   
}
}

 

宽依赖的实现:

class ShuffleDependency[K, V, C](
                                    @transient _rdd: RDD[_ <: Product2[K, V]],
                                    val partitioner: Partitioner,
                                    val serializer: Option[Serializer] = None,
                                    val keyOrdering: Option[Ordering[K]] = None,
                                    val aggregator: Option[Aggregator[K, V, C]] = None,
                                    val mapSideCombine: Boolean = false)
    extends Dependency[Product2[K, V]] {

    override def rdd = _rdd.asInstanceOf[RDD[Product2[K, V]]]

    //获取新的shuffleId
    val shuffleId: Int = _rdd.context.newShuffleId()
    //向ShuffleManager注册Shuffle的信息
    val shuffleHandle: ShuffleHandle =
        _rdd.context.env.shuffleManager.registerShuffle(
            shuffleId, _rdd.partitions.size, this)

    _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}

 

RDD也是一个DAG,每一个RDD都会记住创建该数据集需要哪些操作,跟踪记录RDD的继承关系,这个关系在Spark里面叫lineage。由于创建RDD的操作是相对粗粒度的变换,即单一的操作应用于许多数据元素,而不需存储真正的数据,该技巧比通过网络复制数据更高效。当一个RDD的某个分区丢失时,RDD有足够的信息记录其如何通过其他RDD进行计算,且只需重新计算该分区,但是Spark的lineage也不是完美解决所有问题的

 

对于窄依赖,只需要通过重新计算丢失的那一块数据来恢复,容错成本较小。但如果是宽依赖,则当容错重算分区时,因为父分区数据只有一部分是需要重算子分区的,其余数据重算就造成了冗余计算。

所以,不同的应用有时候也需要在适当的时机设置数据检查点。由于RDD的只读特性使得它比常用的共享内存更容易做检查点,具体可以使用doCheckPoint方法

Checkpoint机制

  • Checkpoint是把内存中变化持久化到一个高可用的分布式系统中,例如hdfs 这时候就可以斩断依赖链,就可以把redo日志删掉了,然后更新下检查点,
  • spark streaming中对于一些有状态的操作,这在某些stateful转换中是需要的,在这种转换中,生成RDD需要依赖前面的batches,会导致依赖链随着时间而变长。为了避免这种没有尽头的变长,要定期将中间生成的 RDDs保存到可靠存储来切断依赖链。同时必须隔一段时间进行一次checkpoint。

使用checkpoint

sc.setCheckpointDir("/data/streaming/checkpoint")
rdd.checkpoint

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐