微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

spark的动态分区裁剪下物理计划怎么实现

本篇内容介绍了“spark的动态分区裁剪下物理计划怎么实现”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

背景

本文基于delta 0.7.0 spark 3.0.1 spark 3.x引入了动态分区裁剪,在 spark 的动态分区裁剪上(Dynamic partition pruning)-逻辑计划我们提到在逻辑计划阶段会加入DynamicPruningSubquery,今天我们分析一下在物理阶段怎么对DynamicPruningSubquery进行优化以及实现的

分析

直接转到PlanDynamicPruningFilters的apply方法

override def apply(plan: SparkPlan): SparkPlan = {
    if (!sqlConf.get.dynamicPartitionPruningEnabled) {
      return plan
    }

    plan transformAllExpressions {
      case DynamicPruningSubquery(
          value, buildplan, buildKeys, broadcastKeyIndex, onlyInbroadcast, exprId) =>
        val sparkPlan = QueryExecution.createSparkPlan(
          sparkSession, sparkSession.sessionState.planner, buildplan)
        // Using `sparkPlan` is a little hacky as it is based on the assumption that this rule is
        // the first to be applied (apart from `InsertAdaptiveSparkPlan`).
        val canReuseExchange = sqlConf.get.exchangeReuseEnabled && buildKeys.nonEmpty &&
          plan.find {
            case broadcastHashJoinExec(_, _, _, BuildLeft, _, left, _) =>
              left.sameResult(sparkPlan)
            case broadcastHashJoinExec(_, _, _, buildright, _, _, right) =>
              right.sameResult(sparkPlan)
            case _ => false
          }.isDefined

        if (canReuseExchange) {
          val mode = broadcastMode(buildKeys, buildplan)
          val executedplan = QueryExecution.prepareExecutedplan(sparkSession, sparkPlan)
          // plan a broadcast exchange of the build side of the join
          val exchange = broadcastExchangeExec(mode, executedplan)
          val name = s"dynamicpruning#${exprId.id}"
          // place the broadcast adaptor for reusing the broadcast results on the probe side
          val broadcastValues =
            SubquerybroadcastExec(name, broadcastKeyIndex, buildKeys, exchange)
          DynamicPruningExpression(InSubqueryExec(value, broadcastValues, exprId))
        } else if (onlyInbroadcast) {
          // it is not worthwhile to execute the query, so we fall-back to a true literal
          DynamicPruningExpression(Literal.TrueLiteral)
        } else {
          // we need to apply an aggregate on the buildplan in order to be column pruned
          val alias = Alias(buildKeys(broadcastKeyIndex), buildKeys(broadcastKeyIndex).toString)()
          val aggregate = Aggregate(Seq(alias), Seq(alias), buildplan)
          DynamicPruningExpression(expressions.InSubquery(
            Seq(value), ListQuery(aggregate, childOutputs = aggregate.output)))
        }
    }
  }
  1. 如果没有开启动态分区裁剪,则直接跳过

  2. QueryExecution.createSparkPlan( sparkSession, sparkSession.sessionState.planner, buildplan) 通过逻辑计划构造物理计划

  3. 判断是否reuseExchange,如果spark.sql.exchange.reuse配置为true,且存在join的是broadcastHashjoin,而且计算结果和要进行过滤的物理计划的结果一样,则进行下一步,

  • 进行物理计划执行前的准备, 得到executedplan

  • 构建broadcastExchangeExec,broadcastValues,InSubqueryExec,DynamicPruningExpression,broadcastExchangeExec内部就是进行spark的broadcast操作 注意:这里的broadcastExchangeExec会在ReuseExchange规则中被优化, 最终会被broadcastQueryStageExec调用,从而公用同一个broacast的值

  • @H_502_37@
    1. 如果以上不满足,认DynamicPruningExpression(Literal.TrueLiteral),也就是不会进行裁剪

    2. 如果不是broadcastHashjoin,但是能够加速,则按照需要过滤的key做一次聚合,之后再组成DynamicPruningExpression

    至此动态裁剪的物理计划优化就分析完了

    “spark的动态分区裁剪下物理计划怎么实现”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注编程之家网站,小编将为大家输出更多高质量的实用文章

    版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐