微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

SPark关于缓存&坑

SPark关于缓存&坑

SPark基本概念

对于Spark有一定了解的童鞋可以跳过前面基础概念的讲解,直接从下面的缓存部分开始看↓↓↓

Spark执行流程

Spark在执行Transformation类型操作时都不会立即执行,当遇到Action类型操作时,就会触发计算,每个action操作都会触发runJob操作,生成一个Job.在Driver中SparkContext根据RDD之间的依赖关系创建出DAG(有向无环图),DAGScheduler负责解析这个DAG,从当前Action方法向前回溯,如果遇到的是窄依赖则应用流水线优化,继续向前找,当碰到一个宽依赖时,就会把这个宽依赖之前的RDD组装成一个stage,再从当前宽依赖开始继续向前找,重复刚才的步骤,以此类推.每个stage由可以并发执行的一组task构成,一个partition对应一个task.DAGScheduler将stage划分完后,会将这组task提交到TaskScheduler去执行,任务会在Executor进程的多个Task线程上执行,完成Task任务后 将结果信息提交到ExecutorBackend中 他会将信息提交给TaskScheduler,TaskScheduler接到消息后通知TaskManager,移除该Task任务,开始执行下一个任务.TaskScheduler同时会将信息同步到TaskSet Manager中一份,全部任务执行完毕后TaskSet Manager将结果反馈给DAGScheduler,如果属于ResultTask 会交给JobListener.否则话全部任务执行完毕后写入数据.

Spark运行架构的特点

每个Application获取专属的Executor进程,该进程在Application期间一直驻留,并以多线程方式运行tasks。这种Application隔离机制有其优势,无论是从调度角度看(每个Driver调度它自己的任务),还是从运行角度看(来自不同Application的Task运行在不同的JVM中)。当前,这也意味着SparkApplication不能跨应用程序共享数据,除非将数据写入到外部存储系统。

几个名词解释

  1. Executor:
    在启动Spark程序时会指定num-executors executor数量,每个executor在yarn上就是一个container 也就是一个JVM
    通过executor-memory指定JVM内存大小

    spark提交命令

    而启动的一个Spark程序就是一个Application

  2. Job
    Spark是通过action算子触发真正的计算,每一个action操作会对应一个Job
    下面就是一个Application中的18个Job,也就是对应代码中的18个action操作

    一个Application中的18个Job

  3. Stage
    如上面所说的,在每个Job中从当前Action方法向前回溯,如果遇到的是窄依赖则应用流水线优化,继续向前找,当碰到一个宽依赖时,就会把这个宽依赖之前的RDD组装成一个stage
    以下就是上面第一个Job内的Stage划分,每个依赖的Stage之间会进行shuffle操作

    action操作

    STAGE切分

  4. Task
    Task可以理解为RDD内的分区,分区数决定了数据将会分散在多少个Task内进行运算
    每个Task内都是相同计算逻辑,而进行计算的数据不同的一个分区
    比如下面就是设置了320个分区,也就是来源数据会分散到320个Task中去计算

    TASK示意图

  5. RDD
    RDD是一个只读的弹性分布式数据集,由于RDD是只读的所以保证了它的数据一致性.每个transformation操作都会生成一个新的RDD,里面记录了它的操作和血缘关系,最后通过action操作触发计算,向前追溯根据shuffle划分stage形成DAG,最终将计算结果输出.

------------------------------基础概念分割线------------------------------

缓存部分

为什么Spark需要Cache机制?在Spark中的作用是什么?
认情况下,RDD只使用一次,用完即扔,再次使用时需要重新计算得到,而持久化操作避免了这里的重复计算.在一些迭代算法中,如果不使用缓冲机制,如果某分区数据丢失,会导致整个计算链重新计算.

也就是说Spark的RDD在每次在内存中计算之后即释放掉,当有一个RDD在第一个Job中计算过之后,这个RDD数据就被释放掉了,而如果后面的Job中可以复用这个RDD的数据时,若不作任何操作,在第二个Job的action操作触发了计算之后
向前追溯执行流程,遇到这个在第一个Job中计算过的RDD后,它会根据计算逻辑把这个RDD再计算一遍
此时我们就可以在第一次计算这个RDD时调用.cache()方法,这样当第一个Job的这个RDD执行完毕后会将这个RDD缓存在内存中而不释放.在第二个Job中再次复用这个RDD时就可以直接取内存中的缓存操作而不会重复计算一遍.

在没有缓存时,生成DAG图的时候第二次遇到这个RDD,会将计算这个RDD的逻辑也组装进来,而缓存后,第二遍遇到这个RDD时会将这个RDD前面的逻辑截断,直接使用缓存的数据.
下面就是两个相同计算逻辑的DAG图,左边是没有缓存的,右边是缓存后的,在右边的执行过程中可以看到多了一个cache的RDD,计算时会直接使用缓存数据而cache之上的计算其实是不会执行的,左边的则会从源数据重新计算一遍

没有显式调用缓存DAG


调用缓存DAG


由于之前了解的每个操作会生成一个新的RDD,所以为了优化性能,只要是复用了的RDD就调用一下cache方法,避免重复计算
而这里有一个误区,就是其实在选品和运营助手项目中,每个RDD内存储的都是一个个对象,如果不缓存,每次重新计算当然是没有问题的,而缓存了之后
下面的维度使用这个缓存在内存中的RDD计算时,其实是会把对象内属性引用的值改变的
在以前使用时,要么不会改变里面的值,要么在后面只对这个RDD进行过一次计算… 这样也是不会有问题的,因为上面使用的RDD已经保存下来了,后面就算改变了也不会去改变前面的结果
但是在这两个项目的多维度计算中:
比如其中一块逻辑是这样的

逻辑示意图1


计算完①web_site_t的值后将计算后的值缓存起来,③web_site,⑤web_t维度的数据都是从缓存了的这个RDD去计算的
计算③web_site维度的值不会有问题,但是当③web_site计算完后,已经将缓存在内存中的①web_site_t维度的RDD内的对象的值引用已经改变了,再使用这个RDD去计算⑤web_t维度时值已经改变了,就会造成数据误差

发现这个问题,确定朝这个方向去探索的原因是,在今天对比了缓存后数据出错与不缓存的正确结果发现,出错的数据就是在前一次改变值之后的值
因为现在的计算逻辑是

逻辑示意图2

先用原始数据去掉国家维度进行聚合计算出①web_site_t维度的数据,再使用原始数据计算含有国家维度的销量数据去计算②web_site_t_cy维度

在ES中保存的结果发现 相同维度(网站=ZF,站点=ZF,端口=PC,SKU=226193105,国家=US)的一条数据中的i7(sku7日销量),i8(sku15日销量),i9(sku30日销量)
与去掉国家维度计算的①web_site_t维度中同一条数据的i7(sku7日销量),i8(sku15日销量),i9(sku30日销量)是完全一样的
而②web_site_t_cy已经是最细粒度,网站=ZF,站点=ZF,端口=PC,SKU=226193105,国家=US的数据在表中只会有一条,对应在Spark程序中计算也就是一个对象
而在①web_site_t聚合计算时刚好就把所有国家的数据聚合累加进了国家=US的这个对象中,所以在②web_site_t_cy维度取数时,这个对象内的值已经被改变成聚合后的结果,导致了数据错误

ES_result1


ES_result2


由于之前一直想着每个RDD生成的都是新的,不会出现这种值改变的情况,而且想着要是会改变的话,数值不会只差这么一点,导致排查这个BUG时走了许多弯路
但是又是因为什么数值只差了一些,而且同spu下只有某些sku的数据有误差呢,我又做了如下试验:
造了五条数据,把数据打印出来

模拟数据


然后模拟项目计算逻辑,根据网站站点终端聚合累加值,计算完后再把结果数据和源数据打印出来

在这里插入图片描述


再根据网站站点终端国家维度分组将结果和源数据打印出来

在这里插入图片描述


再分别使用根据网站站点终端维度聚合后的结果聚合网站站点、网站终端维度数据,并把结果和源数据打印出来

在这里插入图片描述


发现中间源数据结果却是会在计算后改变值,发现同JVM下单线程数据是一定会错的,但在同JVM下多线程,对于Task的划分,在不同的Stage之间会一直从这个Task最初始分配的值计算下去,而每个线程也会对应一个Task(在线程足够的情况下)
所以可能由于数据分配刚好没改变到那条数据的值,这个问题更底层的原因有时间还需要深究一下
而在项目中更是几十个JVM,多线程,且数据量比较大的情况下,才出现了这样的问题
所以在面对大数据问题时,还是不能对自己所了解的东西过于自信,嘻嘻,否则当遇到问题时还是会走许多弯路

在这里插入图片描述


既然产生错误的原因已经查出来了,那么解决方
①对于会出现问题的RDD,不缓存了(这也是之前暴力解决方法
但是显然这个做法是不够好的,每次都重复计算性能可能比较差
一个缓存后的RDD,如果需要被使用两次以上,且会改变对象引用值时,应在最后一次使用之前使用时将对象copy,生成新的对象从而不改变被缓存的值
这样就需要权衡,copy对象而产生大量的对象的影响(包括对象多了以后的GC问题以及复制对象的性能对比重新计算的性能),但在资源足够的情况下大多数情况都是方法2性能更佳,实在不行就只能用方法1了

在改完代码需要使用多次的缓存RDD进行copy之后,对数发现web_t_cy维度的销量数据是正确的,但是流量/环比/订单数据有误差(这部分在web_t_cy没有再次计算直接使用的web_t维度计算好的值)
于是就去查web_t的数据发现确实是在这一步计算就出现了误差,而web_t_cy的销量数据是重新计算覆盖进去的,所以销量数据是对的
我把这个RDD的缓存去掉,跑完数验证了226193105 这个SKU在全维度的数据都是正确的
我还是不死心到底是不是就是这个缓存的问题呢,然后又跑了一次数:下面的test3的索引,发现这个维度下数据又变正确了
而且非常奇怪的事情是第一次跑数错误的时候 i0(7日spu销量) i1(15日spu销量) i2(30日spu销量) i7(1日sku销量) i8(15日sku销量) i9(30日sku销量)
这些数据中,只有30天和15天的销量分别多了2个和1个,也就是说在这个spu下的15个sku中 只有这一个sku的15日销量多了1,30日销量多了30,而7日销量又是正确的(因为SPU销量只是聚合sku聚合后的数据,不涉及这个缓存问题)

在这里插入图片描述


在这里插入图片描述


但是明明根据之前的分析做了copy已经不会改变源数据的引用了,并且为什么7天的数据没有问题 15天 30天又只多了这么一点

令人费解


继续检查test3的索引(有缓存的第二次跑数)的⑦web维度:
首先回顾一下现在的计算逻辑, web维度是从③web_site维度聚合后的结果再进行聚合的,
上面说的问题③web_site和⑤web_t都是由①web_site_t聚合后的结果计算下来的

在这里插入图片描述


test索引是加了缓存第一次跑数 test2是去掉缓存后的跑数(与hive中查出来的数据对过是正确的)
发现就连⑦web维度也和⑤web_t中数据错的一模一样,明明⑦web维度的数据是从③web_site聚合后的结果再去掉site维度进行计算的

在这里插入图片描述


在这里插入图片描述

在这里插入图片描述

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐