这篇文章主要介绍了spark 3.0中如何实现查询计划,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。
基本例子
我们考虑一个简单的例子,一个查询中涉及到filter以及aggregation,join操作的语句:
# in PySpark API: query = ( questionsDF .filter(col('year') == 2019) .groupBy('user_id') .agg( count('*').alias('cnt') ) .join(usersDF, 'user_id') )
我们把例子中的usersDF是一组问问题的用户,这些问题用questionsDF来表示。这些问题用year的这一列来进行分区,代表着哪一年问的问题。在这个查询里,我们对2019年问问题的用户感兴趣,并且想知道每个人问了多少问题,而且我们想知道在输出中我们想知道一些额外信息,这就是为什么我们在聚合之后进行了usersDF的join操作。
这里有两种基本的方式去查看物理计划。第一种是在DataFrame上调用explain函数,该函数展现这个计划的文本化的展示:
这在spark 3.0有了一些优化,explain函数带有了一个新参数 mode,这个参数的值可以是:formatted,cost,codegen。使用formatted模式将会把查询计划转化为更加有组织的输出(这里之展现了一部分):
第二种方式是查看spark ui中的sql tab,这里有正在跑的和已经完成了的查询。通过点击你要查看的查询,我们可以看到物理计划的文本表示。在下面这个图片中,我们结合图形表示,文本表示以及它们之间的对应关系:
CollapseCodegenStages
在物理计划的图形表示中,你能看到一些操作被组织成了一大块蓝色的矩形。这些大矩形对应着codegen阶段。这是发生在物理计划的优化阶段。这个是叫做CollapseCodegenStages来负责优化的,原理是把支持代码生成的操作聚合到一起,通过消除虚拟函数的调用来加速。但是并不是所有的操作支持代码生成。所以一些操作(如exchange操作)并不是大矩形的一部分。在我们的例子中,这里有三个codegen stages,对应着三个大矩形,你能在操作的括号中看到codegen stage的id。从这个树我们也可以分辨出一个操作是够支持代码生成,因为加入支持代码生成的话,这里将会在对应的操作的括号里有个星号。
Scan parquet
scan parquet操作代表着从parquet文件中读取数据。从明细信息中,我们能直接看到从这个数据源中我们选择了哪些列。虽然我们没指定具体的字段,但是这里也会应用ColumnPruning规则,这个规则会确保只有真正字段才会从这个数据源中提取出来。我们也能看到有两种filters:PartitionFilters和PushFilters。PartitionFilters应用在数据源分区的字段上。这是非常重要的因为我们能跳过我们不需要的数据。检查对应的filters是否传播到正确的位置总是没错的。这是因为我们尽可能读取少量的数据,因为IO是比较费时的。在spark 2.4,这里还有一个代表实际读取到的分区的partitionCount字段,这个字段在spark 3.0已经去掉了。
PushFilters把字段直接下推到parquet文件中去,假如parquet文件过滤的列是按照过滤字段排序的话,这个规则就很有用了,因为这种情况下,我们能利用parquet内部结构去过滤数据。parquet文件是按照行组和每个行组的元数据文件组成的。这个元数据包含了每个行组的最大最小值,基于这个信息,我们就能判断是否读取这个行组。
Filter
Filter操作佷容易理解。它仅仅是代表过滤条件。但是这个操作怎么创建的并不是很明显,因为在查询中它并不是直接对应着过滤条件。因为所有的filters首先被Catalyst optimzer处理,改规则可能修改或者重新移动她们。这里有好几个规则在她们转换为物理计划前的逻辑计划。我们列举了一下:
PushDownPredicates-这个规则通过其他的操作把filter下推到离数据源更近的地方,但不是所有的操作都支持。比如,如果表达式不是确定性的,这就不行,假如我们使用类似first,last,collect_set,collect_list,rand等,filters操作就不能通过这些操作而进行下推,因为这些函数是不确定性的。
InferFiltersFromConstraints-这个规则实际上会创建新的filter操作,如从join操作(从inner join中创建一个joining key is not null)
PruneFilters-移除多余的filters(比如一个filters总是true)
Exchange
Exchange操作代表着shuffle操作,意味着物理数据的集群范围内的移动。这个操作是很费时的,因为它会通过网络移动数据。查询计划的信息也包含了一些数据重新分区的细节。在我们的例子中,是hashPartitioning(user_id,200):
RangePartitioning-这个用在对数据排序中,用在orderBy或者sort操作中
HashAggregate
这个代表着数据聚合,这个经常是两个操作,要么被Exchange分开或者不分开:
broadcastHashJoin & broadcastExchange
broadcastHashJoin(BHJ)代表着join算法的操作,除了这个,还有SortMergeJoin和ShuffleHashJoin。BHJ总是伴随着broadcastExchange,这个代表着广播shuffle-数据将会收集到driver端并且会被传播到需要的executor上。
ColumnarToRow
这是在spark 3.0引入的新操作,用于列行之间的转换
感谢你能够认真阅读完这篇文章,希望小编分享的“spark 3.0中如何实现查询计划”这篇文章对大家有帮助,同时也希望大家多多支持编程之家,关注编程之家行业资讯频道,更多相关知识等着你来学习!
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。