当前位置:网站首页>Data skew? Spark 3.0 AQE specializes in all kinds of diseases

Data skew? Spark 3.0 AQE specializes in all kinds of diseases

2021-01-23 18:45:54 Wang Zhiwu

Spark3.0 It has been released for half a year , This big version of the upgrade is mainly focused on performance optimization and rich documentation , among 46% All the optimizations are focused on Spark SQL On ,SQL The most interesting part of optimization is Adaptive Query Execution It must be .

file

Adaptive Query Execution(AQE) Intel big data technology team and Baidu Big Data Infrastructure Engineer in Spark Based on the community version , Improved and implemented adaptive execution engine . In recent years ,Spark SQL I've been aiming at CBO Feature optimization , And it was very successful .

CBO The basic principle

First , Let's start with another rule-based optimization (Rule-Based Optimization, abbreviation RBO) The optimizer for , It's an empirical 、 Heuristic optimization ideas , The optimization rules are pre-defined , Only need to SQL Just put these rules in place . To put it simply ,RBO Like an experienced old driver , You know all the basic routines .

But there is something in the world called – Don't follow the routine . Not so much that it doesn't follow the routine , Rather, it doesn't have a routine . The most typical is complexity Join Operator optimization , For these Join Come on , There are usually two multiple choice questions to do :

  1. Join Which algorithm strategy should be chosen to perform ?BroadcastJoin or ShuffleHashJoin or SortMergeJoin? Different execution strategies have different requirements for system resources , There's a big difference in execution efficiency , The same SQL, It may only take a few seconds to select the right policy to execute , If we don't choose the right execution strategy, it may lead to system failure OOM.

  2. For snowflake or star models , Multiple tables Join What kind of sequence should be chosen to execute ? Different Join Sequence means different execution efficiencies , such as A join B join C,A、B The watches are big ,C The watch is very small , that A join B Obviously, it needs a lot of system resources to compute , The execution time must not be short . And if you use A join C join B Execution order of , because C The watch is very small , therefore A join C You'll get the results soon , And it turned out that the meeting was very small , Then use a small result set join B, The performance will obviously be better than the former one .

Think about it , Is there any fixed optimization rule ? did not . To put it bluntly , You need to know more about the basics of tables ( Watch size 、 Table records the total number, etc ), Only by evaluating the cost of certain rules can we choose an optimal execution plan . therefore ,CBO Cost based optimization strategy , It needs to calculate the cost of all possible execution plans , And choose the execution plan with the least cost .

AQE For the whole Spark SQL The implementation process of the project has been adjusted and optimized accordingly , Its biggest highlight is that it can continuously feedback and re optimize the remaining execution plan according to the real and accurate execution statistics of the completed plan nodes .

CBO So hard to achieve ,Spark How to solve ?

CBO Accounting calculates some statistical data related to business data , To optimize the query , For example, the number of rows 、 The number of lines after de duplication 、 Null value 、 Maximum, minimum, etc .Spark Based on the data , Automatic selection BHJ perhaps SMJ, For many Join In the scene Cost-based Join Reorder, To optimize the execution plan .

however , Because these statistics need to be processed in advance , It will be out of date , So we're judging with outdated data , In some cases, it turns out to be a negative effect , Pull down SQL Execution efficiency .

Spark3.0 Of AQE The framework uses three strategies to solve this problem :

  • Dynamic merger shuffle Partition (Dynamically coalescing shuffle partitions)
  • Dynamic adjustment Join Strategy (Dynamically switching join strategies)
  • Dynamically optimize data skew Join(Dynamically optimizing skew joins)

Let's talk about these three features in detail .

Dynamic merger shuffle The partition

When we're dealing with a very large amount of data ,shuffle In general, it's the one that affects performance most . because shuffle It's a very time-consuming operator , It needs to move data over the network , Distributed to downstream operators .
stay shuffle in ,partition The number of people is critical .partition The best amount of data depends on the data , And the data size is different query Different stage It's going to make a big difference , So it's hard to determine a specific number :

  • If partition Too little , Every partition There will be too much data , It may cause a lot of data to fall to disk , This slows down the query .
  • If partition Too much , Every partition The amount of data will be small , There will be a lot of extra network overhead , And influence Spark task scheduler, So as to slow down the query .

To solve this problem , We set up a relatively large shuffle partition Number , Through the execution process shuffle File data to merge adjacent small partitions.
for example , Suppose we do SELECT max(i) FROM tbl GROUP BY j, surface tbl Only 2 individual partition And the amount of data is very small . We will start with shuffle partition Set to 5, So after grouping, there will be 5 individual partitions. If not AQE Optimize , Will produce 5 individual tasks To aggregate the results , In fact, there are 3 individual partitions The amount of data is very small .

file

But in this case ,AQE Only generates 3 individual reduce task.

file

Dynamic switching join Strategy

Spark Support a lot of Join Strategy , among broadcast hash join Usually the best performance , The premise is to participate in join The data of a table can be loaded into memory . For this reason , When Spark Estimated to participate in join When the amount of table data is less than the threshold of broadcast size , It will be Join The strategy is adjusted to broadcast hash join. however , In many cases, this size estimation can be wrong —— For example, there is a very selective filter .

because AQE Have accurate upstream statistics , So the problem can be solved . Here's an example , The actual size of the right table is 15M, And in this scenario , after filter After filtration , Actual participation join The data size of is 8M, Less than the default broadcast threshold 10M, It should be broadcast .

file

In our execution, it turns into BHJ At the same time , We can even combine tradition shuffle Optimize to local shuffle( for example shuffle Read in mapper Not based on reducer) To reduce network overhead .

Dynamically optimize data skew

Join If there's one in the room key Data skew problem of , So it's basically the performance killer of this mission . stay AQE Before , Users can't handle it automatically Join This thorny problem that I have encountered in my life , You need to collect statistics manually from outside , And do extra salt , Batch processing data and other relatively cumbersome methods to deal with the problem of data skew .

Data skew is essentially caused by the uneven distribution of data between partitions in a cluster , It will slow down join The whole query in the scenario .AQE according to shuffle File statistics automatically detect skew data , Break up the tilted partitions into small sub partitions , And then each of them join.

We can take a look at this scene ,Table A join Table B, among Table A Of partition A0 The data is much larger than other partitions .

file

AQE Will partition A0 Cut into 2 Subarea , And leave them alone with me Table B Of partition B0 Conduct join.

file

If you don't do this optimization ,SMJ Will produce 4 individual tasks And one of them takes much longer to execute than the others . Optimized , This join There will be a 5 individual tasks, But every task The execution time is almost the same , So the whole query brings better performance .

How to open AQE

We can set parameters spark.sql.adaptive.enabled by true To open AQE, stay Spark 3.0 Default is false, And meet the following conditions :

  • Non streaming queries
  • Contains at least one exchange( Such as join、 polymerization 、 Window operators ) Or a subquery

AQE By reducing the dependence on static Statistics , Successfully solved Spark CBO It's hard to deal with trade off( The cost of generating statistics and the time of querying ) And data accuracy . Compared with the previous Limited CBO, Now it's very flexible .

Spark CBO The source code to achieve

Adaptive Execution Patterns are in use Spark Physical execution plan injection generated . stay QueryExecution There are preparations A set of optimizers to optimize the physical execution plan , InsertAdaptiveSparkPlan It's the first optimizer .

InsertAdaptiveSparkPlan Use PlanAdaptiveSubqueries Rule Right part SubQuery After processing , Will the current Plan Package as AdaptiveSparkPlanExec .

When executed AdaptiveSparkPlanExec Of collect() or take() When the method is used , All will be executed first getFinalPhysicalPlan() Method to generate a new SparkPlan, Then execute the corresponding SparkPlan Corresponding method .

// QueryExecution class 
lazy val executedPlan: SparkPlan = {
    executePhase(QueryPlanningTracker.PLANNING) {
      QueryExecution.prepareForExecution(preparations, sparkPlan.clone())
    }
  }

  protected def preparations: Seq[Rule[SparkPlan]] = {
    QueryExecution.preparations(sparkSession,
      Option(InsertAdaptiveSparkPlan(AdaptiveExecutionContext(sparkSession, this))))
  }

  private[execution] def preparations(
      sparkSession: SparkSession,
      adaptiveExecutionRule: Option[InsertAdaptiveSparkPlan] = None): Seq[Rule[SparkPlan]] = {
    // `AdaptiveSparkPlanExec` is a leaf node. If inserted, all the following rules will be no-op
    // as the original plan is hidden behind `AdaptiveSparkPlanExec`.
    adaptiveExecutionRule.toSeq ++
    Seq(
      PlanDynamicPruningFilters(sparkSession),
      PlanSubqueries(sparkSession),
      EnsureRequirements(sparkSession.sessionState.conf),
      ApplyColumnarRulesAndInsertTransitions(sparkSession.sessionState.conf,
        sparkSession.sessionState.columnarRules),
      CollapseCodegenStages(sparkSession.sessionState.conf),
      ReuseExchange(sparkSession.sessionState.conf),
      ReuseSubquery(sparkSession.sessionState.conf)
    )
  }


// InsertAdaptiveSparkPlan 
  override def apply(plan: SparkPlan): SparkPlan = applyInternal(plan, false)

  private def applyInternal(plan: SparkPlan, isSubquery: Boolean): SparkPlan = plan match {
   // ...some checking
    case _ if shouldApplyAQE(plan, isSubquery) =>
      if (supportAdaptive(plan)) {
        try {
          // Plan sub-queries recursively and pass in the shared stage cache for exchange reuse.
          // Fall back to non-AQE mode if AQE is not supported in any of the sub-queries.
          val subqueryMap = buildSubqueryMap(plan)
          val planSubqueriesRule = PlanAdaptiveSubqueries(subqueryMap)
          val preprocessingRules = Seq(
            planSubqueriesRule)
          // Run pre-processing rules.
          val newPlan = AdaptiveSparkPlanExec.applyPhysicalRules(plan, preprocessingRules)
          logDebug(s"Adaptive execution enabled for plan: $plan")
          AdaptiveSparkPlanExec(newPlan, adaptiveExecutionContext, preprocessingRules, isSubquery)
        } catch {
          case SubqueryAdaptiveNotSupportedException(subquery) =>
            logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} is enabled " +
              s"but is not supported for sub-query: $subquery.")
            plan
        }
      } else {
        logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} is enabled " +
          s"but is not supported for query: $plan.")
        plan
      }
    case _ => plan
  }

AQE Yes Stage The implementation and optimization process of phased submission is as follows :

  private def getFinalPhysicalPlan(): SparkPlan = lock.synchronized {
    //  First call  getFinalPhysicalPlan The method is false, Wait for the method to finish executing , All Stage It won't change , Go straight back to the end plan
    if (isFinalPlan) return currentPhysicalPlan

    // In case of this adaptive plan being executed out of `withActive` scoped functions, e.g.,
    // `plan.queryExecution.rdd`, we need to set active session here as new plan nodes can be
    // created in the middle of the execution.
    context.session.withActive {
      val executionId = getExecutionId
      var currentLogicalPlan = currentPhysicalPlan.logicalLink.get
      var result = createQueryStages(currentPhysicalPlan)
      val events = new LinkedBlockingQueue[StageMaterializationEvent]()
      val errors = new mutable.ArrayBuffer[Throwable]()
      var stagesToReplace = Seq.empty[QueryStageExec]
      while (!result.allChildStagesMaterialized) {
        currentPhysicalPlan = result.newPlan
        //  What's next Stage To execute , Reference resources  createQueryStages(plan: SparkPlan)  Method 
        if (result.newStages.nonEmpty) {
          stagesToReplace = result.newStages ++ stagesToReplace
          // onUpdatePlan  adopt listener to update UI
          executionId.foreach(onUpdatePlan(_, result.newStages.map(_.plan)))

          // Start materialization of all new stages and fail fast if any stages failed eagerly
          result.newStages.foreach { stage =>
            try {
              // materialize()  Method pair Stage As a separate Job Submit for execution , And back to  SimpleFutureAction  To receive the execution results 
              // QueryStageExec: materialize() -> doMaterialize() ->
              // ShuffleExchangeExec: -> mapOutputStatisticsFuture -> ShuffleExchangeExec
              // SparkContext: -> submitMapStage(shuffleDependency)
              stage.materialize().onComplete { res =>
                if (res.isSuccess) {
                  events.offer(StageSuccess(stage, res.get))
                } else {
                  events.offer(StageFailure(stage, res.failed.get))
                }
              }(AdaptiveSparkPlanExec.executionContext)
            } catch {
              case e: Throwable =>
                cleanUpAndThrowException(Seq(e), Some(stage.id))
            }
          }
        }

        // Wait on the next completed stage, which indicates new stats are available and probably
        // new stages can be created. There might be other stages that finish at around the same
        // time, so we process those stages too in order to reduce re-planning.
        //  wait for , Until there is Stage completion of enforcement 
        val nextMsg = events.take()
        val rem = new util.ArrayList[StageMaterializationEvent]()
        events.drainTo(rem)
        (Seq(nextMsg) ++ rem.asScala).foreach {
          case StageSuccess(stage, res) =>
            stage.resultOption = Some(res)
          case StageFailure(stage, ex) =>
            errors.append(ex)
        }

        // In case of errors, we cancel all running stages and throw exception.
        if (errors.nonEmpty) {
          cleanUpAndThrowException(errors, None)
        }

        // Try re-optimizing and re-planning. Adopt the new plan if its cost is equal to or less
        // than that of the current plan; otherwise keep the current physical plan together with
        // the current logical plan since the physical plan's logical links point to the logical
        // plan it has originated from.
        // Meanwhile, we keep a list of the query stages that have been created since last plan
        // update, which stands for the "semantic gap" between the current logical and physical
        // plans. And each time before re-planning, we replace the corresponding nodes in the
        // current logical plan with logical query stages to make it semantically in sync with
        // the current physical plan. Once a new plan is adopted and both logical and physical
        // plans are updated, we can clear the query stage list because at this point the two plans
        // are semantically and physically in sync again.
        //  On the front Stage Replace with  LogicalQueryStage  node 
        val logicalPlan = replaceWithQueryStagesInLogicalPlan(currentLogicalPlan, stagesToReplace)
        //  Call again optimizer  and planner  To optimize 
        val (newPhysicalPlan, newLogicalPlan) = reOptimize(logicalPlan)
        val origCost = costEvaluator.evaluateCost(currentPhysicalPlan)
        val newCost = costEvaluator.evaluateCost(newPhysicalPlan)
        if (newCost < origCost ||
            (newCost == origCost && currentPhysicalPlan != newPhysicalPlan)) {
          logOnLevel(s"Plan changed from $currentPhysicalPlan to $newPhysicalPlan")
          cleanUpTempTags(newPhysicalPlan)
          currentPhysicalPlan = newPhysicalPlan
          currentLogicalPlan = newLogicalPlan
          stagesToReplace = Seq.empty[QueryStageExec]
        }
        // Now that some stages have finished, we can try creating new stages.
        //  Enter the next cycle , If there is Stage completion of enforcement ,  Corresponding resultOption  It will be worth , Corresponding allChildStagesMaterialized  attribute  = true
        result = createQueryStages(currentPhysicalPlan)
      }

      // Run the final plan when there's no more unfinished stages.
      //  All the front end stage All done , according to stats Information optimization physical execution plan , Determine the final  physical plan
      currentPhysicalPlan = applyPhysicalRules(result.newPlan, queryStageOptimizerRules)
      isFinalPlan = true
      executionId.foreach(onUpdatePlan(_, Seq(currentPhysicalPlan)))
      currentPhysicalPlan
    }
  }

// SparkContext
  /**
   * Submit a map stage for execution. This is currently an internal API only, but might be
   * promoted to DeveloperApi in the future.
   */
  private[spark] def submitMapStage[K, V, C](dependency: ShuffleDependency[K, V, C])
      : SimpleFutureAction[MapOutputStatistics] = {
    assertNotStopped()
    val callSite = getCallSite()
    var result: MapOutputStatistics = null
    val waiter = dagScheduler.submitMapStage(
      dependency,
      (r: MapOutputStatistics) => { result = r },
      callSite,
      localProperties.get)
    new SimpleFutureAction[MapOutputStatistics](waiter, result)
  }


// DAGScheduler
  def submitMapStage[K, V, C](
      dependency: ShuffleDependency[K, V, C],
      callback: MapOutputStatistics => Unit,
      callSite: CallSite,
      properties: Properties): JobWaiter[MapOutputStatistics] = {

    val rdd = dependency.rdd
    val jobId = nextJobId.getAndIncrement()
    if (rdd.partitions.length == 0) {
      throw new SparkException("Can't run submitMapStage on RDD with 0 partitions")
    }

    // We create a JobWaiter with only one "task", which will be marked as complete when the whole
    // map stage has completed, and will be passed the MapOutputStatistics for that stage.
    // This makes it easier to avoid race conditions between the user code and the map output
    // tracker that might result if we told the user the stage had finished, but then they queries
    // the map output tracker and some node failures had caused the output statistics to be lost.
    val waiter = new JobWaiter[MapOutputStatistics](
      this, jobId, 1,
      (_: Int, r: MapOutputStatistics) => callback(r))
    eventProcessLoop.post(MapStageSubmitted(
      jobId, dependency, callSite, waiter, Utils.cloneProperties(properties)))
    waiter
  }

At present ,AdaptiveSparkPlanExec The list of optimizers for physical execution in is as follows :

// AdaptiveSparkPlanExec
  @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq(
    ReuseAdaptiveSubquery(conf, context.subqueryCache),
    CoalesceShufflePartitions(context.session),
    // The following two rules need to make use of 'CustomShuffleReaderExec.partitionSpecs'
    // added by `CoalesceShufflePartitions`. So they must be executed after it.
    OptimizeSkewedJoin(conf),
    OptimizeLocalShuffleReader(conf),
    ApplyColumnarRulesAndInsertTransitions(conf, context.session.sessionState.columnarRules),
    CollapseCodegenStages(conf)
  )

among OptimizeSkewedJoin The method is for the most prone to data skew Join Optimization carried out :

AQE In mode , Every Stage Perform before , Pre dependence Stage It's all done , Then you can get every Stage Of stats Information .
If I found shuffle partition More than partition size The median of 5 times , And partition The output of is greater than 256M Will be judged to produce data skew , take partition Data according to targetSize It is divided into N Share .
targetSize = max(64M, Non data skew partition The average size of ).

Before optimization shuffle as follows :

file

After optimization shuffle:

file

Spark3.0AQE stay FreeWheel The application and practice of

FreeWheel The team can catch up with 2020 Before the Christmas advertising season in 2007, it was successfully launched in the production environment , The overall performance is improved by up to 40%( For big batch) The data of ,AWS Cost Average savings 25%~30% Between , About a million dollars a year .

Major upgrade changes

open Spark 3.0 AQE New features , The main configuration is as follows :

  "spark.sql.adaptive.enabled": true,
  "spark.sql.adaptive.coalescePartitions.enabled": true,
  "spark.sql.adaptive.coalescePartitions.minPartitionNum": 1,
  "spark.sql.adaptive.advisoryPartitionSizeInBytes": "128MB"

It should be noted that ,AQE Features are just reducer The stage does not need to be specified reducer The number of , But it doesn't mean you don't need to specify the parallelism of tasks anymore . because map The stage still needs to divide the data into appropriate partitions for processing , If the parallelism is not specified, the default 200, When the amount of data is too large , It's easy to show up OOM. It is recommended to configure the parameters according to the parallelism setting before the task spark.sql.shuffle.partitions and spark.default.parallelism.

Let's take a closer look at why we upgraded to 3.0 In the future, you can reduce the running time , It can also save the cost of cluster . With Optimus Take the operation of a table in data modeling as an example :

  • stay reduce The stage never AQE Of 40320 individual tasks Down to 4580 individual tasks, Reduced by an order of magnitude .
  • In the lower part of the picture below, there is no AQE Of Spark 2.x Of task situation , The top half is open AQE After the feature Spark 3.x The situation of .

file

  • From a more detailed run time diagram ,shuffler reader After the same aggregate The operation time is also from 4.44h To 2.56h, Save nearly half .
  • On the left is spark 2.x Detailed operation indicators , On the right is open AQE After passage custom shuffler reader After the operation index situation .

file

Performance improvement

AQE performance

AQE For the whole Spark SQL The implementation process of the project has been adjusted and optimized accordingly ( Here's the picture ), Its biggest highlight is that it can continuously feedback and re optimize the remaining execution plan according to the real and accurate execution statistics of the completed plan nodes .

file

AQE Automatic adjustment reducer The number of , Reduce partition Number .Spark The parallelism of tasks has always been a problem for users . If the parallelism is too high , It can lead to task Too much ,overhead The larger , Slow down the task as a whole . And if the parallelism is too small , The data partition will be larger , It's easy to see OOM The problem of , And resources can't be used reasonably , The advantages of parallel running tasks can not be maximized .

And because of Spark Context The parallelism of the whole task , It needs to be set at the beginning and cannot be modified dynamically , It's easy to see that at the beginning of a task, there is a large amount of data and a large degree of parallelism is needed , In the process of running, the final data set may become very small through transformation and filtering , The initial number of partitions is too large .AQE Can solve this problem very well , stay reducer To read data , According to the size of partition data set by the user (spark.sql.adaptive.advisoryPartitionSizeInBytes) To automatically adjust and merge (Coalesce) Small partition, Adaptively reduce partition The number of , To reduce resource waste and overhead, Improve mission performance .

As can be seen from the single table above , open AQE It's a lot lower when it's time task The number of , Except for lightening Driver The burden of , Also reduce startup task It brings schedule,memory, Start management, etc overhead, Reduce cpu The occupation of , Ascending I/O performance .

Take history Data Pipelines For example , At the same time, there will be more than 30 tables in parallel Spark Running in , Every table has a huge performance improvement , So other tables can get more resources earlier , Benefit each other , In the end, the whole data modeling process will naturally have an accelerated result .

Big batch(>200G) Relatively small batch(< 100G ) There is a big improvement , There are up to 40% promote , Mainly because of the big batch There is a large amount of data , It takes a lot of machines , The concurrency of settings is also greater , that AQE There will be more and more obvious moments to show features . And small batch Concurrency is relatively low , Then the promotion will be relatively less , But there are also 27.5% Right and left acceleration .

Memory optimization

Except for AQE Opening of , Cut down on broken task about memory Out of the occupation of ,Spark 3.0 We have also done a lot of memory optimization in other places , such as Aggregate Part of the index is slimming 、Netty Shared memory for Pool function 、Task Manager The deadlock problem 、 Avoid reading from the network in some scenarios shuffle block wait , To reduce the pressure on memory . A series of memory optimizations plus AQE From the previous memory practice diagram, we can see that the memory usage of the cluster has 30% Left right decline .

Practical achievements

The main practical results of the upgrade are as follows :

Significantly improved performance

  • The historical data Pipeline For big batch The data of (200~400G/ Every hour ) Performance improvements up to 40%, For small batch( Less than 100G/ Every hour ) The improvement effect is not great batch The improvement is so obvious , Every day all batches Average level of improvement 27.5% about .

  • The performance of prediction data is improved on average 30%. Because the data input source is different , At present, there are two pipelines Running history and forecasting data , The number of tables generated is also different , So we made separate assessments .

Take the end-to-end running time of historical data online as an example ( Here's the picture ), The naked eye can see the whole line pipeline There has been a significant decline in the running time of , It can output data faster for downstream use .

file

Reduced cluster memory usage

Cluster memory usage for large batch To reduce 30% about , Save... On average every day 25% about .

Run time cluster with historical data memory stay ganglia Take the screenshot on the screen as an example ( Here's the picture ), The memory usage of the whole cluster is from 41.2T drop to 30.1T, That means we can run the same thing with fewer machines and less money Spark Mission .

file

AWS Cost Reduce

Pipelines Made automatic Scale In/Scale Out Strategy : Expand the cluster when resources are needed Task node , Automatically shrink the cluster after the task Task node , And according to every batch The size of the data through algorithm learning to get the best number of machines . By upgrading to Spark 3.0 after , Now that the task is faster and requires fewer machines , Statistics after online AWS Cost Save every day 30% about , About a year can save millions of costs for the company .

Welcome to your attention ,《 The way of big data becoming God 》 Series articles

Welcome to your attention ,《 The way of big data becoming God 》 Series articles

Welcome to your attention ,《 The way of big data becoming God 》 Series articles

版权声明
本文为[Wang Zhiwu]所创,转载请带上原文链接,感谢
https://chowdera.com/2021/01/20210123184458176h.html

随机推荐