如何根据Spark sql explaind中的统计信息深入了解CBO优化,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。
Spark sql 优化器使用两种优化方式:基于规则的和基于代价的。前者依赖于启发式规则,而后者依赖于数据的统计属性。在这篇文章里,我们解释一下在底层这些统计信息是怎么被用到,以及哪些场景下是有用的,并且怎么来使用这些统计信息。
大部分基于启发式的优化规则都没有考虑到被处理的数据属性。比如:基于启发式的PredicatePushDown规则就是基于先过滤再计算的假设。
然而有些场景spark能够通过数据的统计信息来得出更好的计划,这通常被称作基于代价的优化或者CBO,我们来探讨一下细节。
怎么看到统计信息
为了能够看到一个表的统计信息首先我们需要通过运行sql语句来计算(所有的sql语句可以通过使用sql()函数来指定,spark.sql(需要指定的sql字符串)):
ANALYZE TABLE table_name COmpuTE STATISTICS
运行完这个以后,表级别的统计信息就会统计出来并且被存储在元数据中,我们可以通过以下语句来查看:
DESCRIBE EXTENDED table_name
这将会展现一些表属性以及表级别的统计信息。这有两种维度信息:rowCount和sizeBytes:
ANALYZE TABLE table_name COmpuTE STATISTICS FOR COLUMNS col_name DESCRIBE EXTENDED table_name column_name
这将展示给我们类似一下的表(在这个例子中我们使用的列是user_id):
spark.table(table_name).explain(mode='cost')
这个将会给我们展示两种查询计划,物理计划和优化的逻辑计划,该逻辑计划将会展示一些统计信息,就像以下图片展示的:
接下来,我们将会了解叶子节点是这么计算统计信息,以及怎么传播的。
统计信息怎么被计算的
叶子节点计算统计信息有三种方式:第一种也是最好的一种是从元数据中获取的统计信息。第二种是spark将会使用InMemoryFileIndex,他将会调用底层的 Hadoop API去收集数据源中的每个文件的的大小并且求和得到总值sizeInBytes(这里只有sizeInBytes度量会被计算到),最后一种方式是spark将会使用默认的sizeInBytes维度值,该值由spark.sql.defaultSizeInBytes配置 并且该默认值为8EIB,所以基本上spark对于Relation sizeInBytes将会尽可能的进行重新计算覆盖。(这也是只有sizeInBytes这种度量用到),这三种方式可以通过一下图表进行描述:
在图表中,我们有四种条件,第一种决定了统计信息怎么被获取:假如我们读取数据作为一个表df=spark.table(table_name),那我们就进入到左边,否则进入到右边,下一个条件是 是否基于代价的优化(CBO)是否开启,这个通过spark.sql.cbo.enabled配置,默认值是false(到spark 3.0.0).第三个条件是在元数据的统计信息是否通过analyzed table command(ATC)计算出来的,最后一个是表是否分区。 最好的情况是 我们读取数据作为一个表,CBO是开启的,而且已经运行了ATC,这种情况下,所有的统计信息将会从元数据中获取(除了从rowCount计算的sizeInBytes),另一个方面,最坏的情况是,我们读取数据作为一个表,但是ATC没有运行,而且表是分区的,这种情况下默认的sizeInBytes将会从配置中读取,并且计算是很不精确的,注意最坏的情况跟CBO是否开启是无关的。注意一点:假如表不是分区的,spark将会使用Hadoop API计算sizeInBytes,所以表是否分区直接影响了叶子节点的统计信息被计算的方式。
统计信息怎么通过计划被传播的
一旦叶子节点的统计信息被计算出来,该统计信息会被传播到其他节点。这里有两种传播方式:第一种(我们称之为老方式)是非常基本的而且只有一种维度sizeInBytes被传播,并且在各种操作中该维度被调整的的方式也是很基本的。例如,Filter操作并不调整sizeInBytes的值,如下所示:
( spark.table(table_name) .filter(col("user_id") < 0) ).explain(mode="cost")
在这个查询中,我们过滤除了所有user_id是负数的记录,实际上是没有该记录的,但是spark并不能获取这种信息,因为这种需要列级别的统计信息,这再老方式中不会被使用到。所以从这查询计划中可以看到,只有sizeInBytes被传播,并且在两个操作中该值保持不变.
在这种新方式中,为了计算sizeInBytes,spark首先根据每个数据类型计算出单行的大小,之后再乘以rowCount去得到最终的sizeInBytes。假如rowCount是零,则sizeInBytes将会设置为1去避免在其他统计算的除零问题。这也适用于project操作(spark知道哪些列将会被投影,所以需要提前计算出单行的大小)
统计信息怎么被使用
此时我们已经知道了统计信息怎么被计算的以及怎么通过逻辑计划传播的,现在让我们来看一下在查询计划中怎么被使用以获取更优的计划。
这有两个地方统计信息会被使用:第一个是JoinSelection策略,这里spark将会决定使用哪种算法进行join两个DataFrame(更多的细节参考 这里。基本的逻辑就是假如一个df小于某个阈值,spark将会使用BraodcastHashJoin(BHJ),因为假如被广播的df如果很小的话,这将是一个非常有效的方式。这个阈值通过spark.sql.autobroadcastJoinThreshold 配置,默认是10MB,所以对于df的大小有个很好的预估的话,能够帮助我们选择一个更好的join优化短发。
第二个地方也是跟join相关,即joinRecorder规则,使用这个规则 spark将会找到join操作最优化的顺序(如果你join多于两个表的话)。这个规则默认是关闭的,假如你想开启的话,通过如下配置:
spark.conf.set("spark.sql.cbo.joinReorder.enabled",True)
spark.conf.set("spark.sql.cbo.joinReorder.dp.threshold",n)
n的默认值是12。
什么时候使用 ANALYZE TABLE command(ATC)?
我们已经知道假如一个表是分区的,并且我们没有运行ATC,spark将会使用默认的值 8EIB,这是很大的。所以在我们join很多表并且这些表是分区且十分小的情况下,他们是可以进行BHJ的,并且运行ATC是有意义的。当然我们必须清楚,加入一个表的数据被追加或者被覆盖了,之前的统计信息就会被删除,所以我们必须重新运行ATC。在某些情况下,更新元数据的统计信息是比较复杂的。一个解决方法是利用自适应查询-spark 3.0的新特性。
自适应查询
在spark 3.0 自适应查询(AQE)这个新特性被开发,它将会以一种更加高级的方式使用统计信息。假如开启了AQE(默认不开启),在每个stage执行完后,统计信息会被重新计算。这就可以获取更加精确的统计信息,以便能够决定是否使用BHJ,AQE自身是一个很大的主题,我们分几篇文章来介绍它。
看完上述内容,你们掌握如何根据Spark sql explaind中的统计信息深入了解CBO优化的方法了吗?如果还想学到更多技能或想了解更多相关内容,欢迎关注编程之家行业资讯频道,感谢各位的阅读!
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。