-
sql语句大全之spark sql逻辑计划(优化完)转物理
在org.apache.spark.sql.execution中实现了有所有的数据库操作,但是注意这里仅仅是物理算子,这些操作分为三类:UnaryNode,LeafNode和BinaryNode。
一元节点UnaryNode的操作有:
Aggregate,DebugNode,EXchange,Filter,Generate,Project,Sample,Sort,StopAfter,TopK。
二元节点BinaryNode的操作有:
BroadcastNestedLoopJoin,CartesianProduct,SparkEquiInnerJoin。
叶子节点LeftNode的操作有:
ExistingRdd,ParquetTableScan。
分析一下join操作,join有两个孩子节点,是二元算子,其中会添加projection算子。有一种情况,比如T1表的a,b,c三个属性和T2表的a,d,e三个属性,如果在T1和T2表的a属性上做连接,最后输出三个属性T1.a,T1.b,T2.d。这样的话首先会在T1表上添加projection将a,b属性选出来,然后在T2表上添加Projection将a,d属性选出来,然后连接选出的属性,SparkEquiInnerJoin物理算子如下:
case class SparkEquiInnerJoin(
leftKeys: Seq[Expression],
rightKeys: Seq[Expression],
left: SparkPlan,//左孩子,下面是右孩子
right: SparkPlan) extends BinaryNode {
//outputPartitioning是基类中的函数,在此只是重载。partition策略如何
override def outputPartitioning: Partitioning = left.outputPartitioning
//requiredChildDistribution是基类中的函数,在此是重载。孩子数据分布如何
override def requiredChildDistribution =
ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
def output = left.output ++ right.output
def execute() = attachTree(this, "execute") {
val leftWithKeys = left.execute().mapPartitions { iter =>
//因为会有left孩子节点和leftWithKeys不一样的schema,所以要根据leftKeys做一次projection操作
val generateLeftKeys = new Projection(leftKeys, left.output)
iter.map(row => (generateLeftKeys(row), row.copy()))
}
val rightWithKeys = right.execute().mapPartitions { iter =>
//因为会有left孩子节点和leftWithKeys不一样的schema,所以要根据leftKeys做一次projection操作
val generateRightKeys = new Projection(rightKeys, right.output)
iter.map(row => (generateRightKeys(row), row.copy()))
}
// Do the join.做连接,连接左和右,得到所有的结果
val joined = filterNulls(leftWithKeys).joinLocally(filterNulls(rightWithKeys))
// Drop join keys and merge input tuples.
// build每一行数据,就是简单的将生成的leftTuple和rightTuple相加
joined.map { case (_, (leftTuple, rightTuple)) => buildRow(leftTuple ++ rightTuple) }
}
/**
* Filters any rows where the any of the join keys is null, ensuring three-valued
* logic for the equi-join conditions.
*/
protected def filterNulls(rdd: RDD[(Row, Row)]) =
rdd.filter {
case (key: Seq[_], _) => !key.exists(_ == null)
}
}
在上面的join物理算子中execute函数式每个物理操作都必须实现的函数,还有两个函数outputPartitioning和requiredChildDistribution,这两个函数的作用呆会探讨。
lazy val sparkPlan = planner(optimizedPlan).next()
lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan)
上面这两步中第一步是从优化过的逻辑执行计划生成物理执行计划,在此调用的QueryPlanner抽象类中的apply方法,这个方法将逻辑执行计划中的每个操作对应strategies中相应的case class来生成具体的物理操作,现在strategies有这么多个,可以在SqlContext类中找到,如下:
/*
* 所有的策略都在这里。
* */
val strategies: Seq[Strategy] =
TopK ::
PartialAggregation ::
SparkEquiInnerJoin ::
ParquetOperations ::
BasicOperators ::
CartesianProduct ::
BroadcastNestedLoopJoin :: Nil
第二步会调用outputPartitioning和requiredChildDistribution这两个函数,是从两个child的分区策略和数据分布情况来确定是否需要添加shuffle操作,具体代码是在Exchange.scala中的AddExchange函数中,在此对物理执行计划分析看是否需要添加shuffle。
以上分析基于spark sql的从优化完的逻辑执行计划到物理执行计划的生成,总体来说分两方面,一方面是利用strategies讲逻辑算子转化为物理算子,还有一方面,因为是分布式系统,少不了的需要在物理执行计划中添加shuffle,接下来:
1,具体分析什么情况下需要添加AddExchange。
2,需要了解所有的物理操作算子,特别是join算子的变形。
本文完
---------------------
作者:egraldloi
来源:CSDN
原文:https://blog.csdn.net/egraldloi/article/details/23139645
版权声明:本文为博主原创文章,转载请附上博文链接!
最新更新
nodejs爬虫
Python正则表达式完全指南
爬取豆瓣Top250图书数据
shp 地图文件批量添加字段
爬虫小试牛刀(爬取学校通知公告)
【python基础】函数-初识函数
【python基础】函数-返回值
HTTP请求:requests模块基础使用必知必会
Python初学者友好丨详解参数传递类型
如何有效管理爬虫流量?
SQL SERVER中递归
2个场景实例讲解GaussDB(DWS)基表统计信息估
常用的 SQL Server 关键字及其含义
动手分析SQL Server中的事务中使用的锁
openGauss内核分析:SQL by pass & 经典执行
一招教你如何高效批量导入与更新数据
天天写SQL,这些神奇的特性你知道吗?
openGauss内核分析:执行计划生成
[IM002]Navicat ODBC驱动器管理器 未发现数据
初入Sql Server 之 存储过程的简单使用
这是目前我见过最好的跨域解决方案!
减少回流与重绘
减少回流与重绘
如何使用KrpanoToolJS在浏览器切图
performance.now() 与 Date.now() 对比
一款纯 JS 实现的轻量化图片编辑器
关于开发 VS Code 插件遇到的 workbench.scm.
前端设计模式——观察者模式
前端设计模式——中介者模式
创建型-原型模式