一、源码分析
1、
###入口org.apache.spark.sql/SQLContext.scalasql()方法:/** * 使用Spark执行一条SQL查询语句,将结果作为DataFrame返回,SQL解析使用的方言,可以 * 通过spark.sql.dialect参数,来进行设置 */ def sql(sqlText: String): DataFrame = { // 首先,查看我们通过SQLContext.setConf()方法设置的参数,Spark.sql.dialect, // 如果是sql方言,就进入接下来的执行,如果不是sql,就直接报错 // 这里的conf就是SQLConf if (conf.dialect == "sql") { // SQLContext的sql()方法正式进入执行阶段,Spark SQL也是有lazy特性的,其实,调用sql()去执行一条SQL语句的时候 // 默认只会调用SqlParser组件针对SQL语句生成一个Unresolved LogicPlan,然后,将Unresolved LogicPlan和SQLXontext // 自身的实例(this),封装为一个DataFrame,返回DataFrame给用户,其中仅仅封装了SQL语句的Unresolved LogicPlan // 在用户拿到了封装了Unresolved LogicPlan的DataFrame之后,可以执行一些show()、select().show()、groupBy().show() // 或者拿到DataFrame对应的RDD,执行一系列transformation操作,最后执行一个Action后 // 才会去触发Spark SQL后续的SQL执行流程,包括Analyzer、Optimizer、SparkPlan、execute PysicalPlan // 首先看parseSql()方法,传入SQL语句,调用SqlParser解析SQL,获取Unresolved LogicPlan // parseSql(),总结一下,就是调用了SqlParser的apply()方法,即由SqlParser将SQL语句通过内部的各种select、insert这种词法、语法解析器 // 来进行解析,然后将SQL语句的各个部分,组装成一个LogicalPlan,但是这里的LogicalPlan,只是一颗语法树,还不知道自己具体执行计划的时候, // 数据从哪里来,所以,叫做UnResolved LogicalPlan,解析了SQL,拿到了UnResolved LogicalPlan,会封装成一个DataFrame,返回给用户, // 用户此时就开始用DataFrame执行各种操作了 DataFrame(this, parseSql(sqlText)) } else { sys.error(s"Unsupported SQL dialect: ${conf.dialect}") } }setConf()方法:protected[sql] lazy val conf: SQLConf = new SQLConf /** * 如果要给Spark SQL设置一些参数,那么要使用SQLContext.setConf()方法,底层是会将配置信息放入SQLConf对象中的 */ def setConf(props: Properties): Unit = conf.setConf(props)parseSql()方法: // sqlParser 实际上是SparkSQLParser的实例,SparkSQLParser里面,又封装了catalyst的SqlParser @transient protected[sql] val sqlParser = { val fallback = new catalyst.SqlParser new SparkSQLParser(fallback(_)) } protected[sql] def parseSql(sql: String): LogicalPlan = { // parseSql()方法,是SqlParser执行的入口,实际上,会调用SqlParser的apply()方法,来获取一个对SQL语句解析后的LogicalPlan ddlParser(sql, false).getOrElse(sqlParser(sql)) }parseSql()会调SqlParser的apply()方法SqlParser这个类在org.apache.spark.sql.catalyst包下,其继承了AbstractSparkSQLParser 类class SqlParser extends AbstractSparkSQLParser {parseSql()会调SqlParser的apply()方法,会调AbstractSparkSQLParser的apply()方法private[sql] abstract class AbstractSparkSQLParser extends StandardTokenParsers with PackratParsers { /** * 实际上,调用SqlParser的apply()方法,将SQL解析成LogicalPlan时,会调用到SqlParser的父类,AbstractSparkSQLParser * 的apply()方法 */ def apply(input: String): LogicalPlan = { // Initialize the Keywords. lexical.initialize(reservedWords) // 这个代码的意思,就是用lexical.Scanner,针对SQL语句,来进行语法检查、分析,满足语法检查结果的话,就使用SQL解析器 // 针对SQL进行解析,包括词法解析(将SQL语句解析成一个个短语,token)、语法解析,最后生成一个Unresolved LogicalPlan // 该LogicalPlan仅仅针对SQL语句本身生成,纯语法,不设计任何关联的数据源等等信息 phrase(start)(new lexical.Scanner(input)) match { case Success(plan, _) => plan case failureOrError => sys.error(failureOrError.toString) } }}###org.apache.spark.sql.catalyst/AbstractSparkSQLParser.scala/** * 用SqlLexical,对SQL语句,执行一个检查,如果满足检查的话,那么才去分析, * 否则,说明SQL语句本身的语法,是有问题的 */ class SqlLexical extends StdLexical { case class FloatLit(chars: String) extends Token { override def toString = chars }}/** * 从这里可以看出来,Spark SQL是支持两种主要的SQL语法的,包括select语句和insert语句 */ protected lazy val start: Parser[LogicalPlan] = ( (select | ("(" ~> select <~ ")")) * ( UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) } | INTERSECT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Intersect(q1, q2) } | EXCEPT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Except(q1, q2)} | UNION ~ DISTINCT.? ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) } ) | insert ) /** * 这里,就是对select语句执行解析,语句里面可以包括FROM,WHERE,GROUP,HAVING,LIMIT */ protected lazy val select: Parser[LogicalPlan] = SELECT ~> DISTINCT.? ~ repsep(projection, ",") ~ (FROM ~> relations).? ~ (WHERE ~> expression).? ~ (GROUP ~ BY ~> rep1sep(expression, ",")).? ~ (HAVING ~> expression).? ~ sortType.? ~ (LIMIT ~> expression).? ^^ { case d ~ p ~ r ~ f ~ g ~ h ~ o ~ l => val base = r.getOrElse(NoRelation) val withFilter = f.map(Filter(_, base)).getOrElse(base) val withProjection = g .map(Aggregate(_, assignAliases(p), withFilter)) .getOrElse(Project(assignAliases(p), withFilter)) val withDistinct = d.map(_ => Distinct(withProjection)).getOrElse(withProjection) val withHaving = h.map(Filter(_, withDistinct)).getOrElse(withDistinct) val withOrder = o.map(_(withHaving)).getOrElse(withHaving) val withLimit = l.map(Limit(_, withOrder)).getOrElse(withOrder) withLimit } /** * 解析INSERT、OVERWRITE这种语法 */ protected lazy val insert: Parser[LogicalPlan] = INSERT ~> (OVERWRITE ^^^ true | INTO ^^^ false) ~ (TABLE ~> relation) ~ select ^^ { case o ~ r ~ s => InsertIntoTable(r, Map.empty[String, Option[String]], s, o) } /** * 查那些列 */ protected lazy val projection: Parser[Expression] = expression ~ (AS.? ~> ident.?) ^^ { case e ~ a => a.fold(e)(Alias(e, _)()) } // Based very loosely on the MySQL Grammar. // http://dev.mysql.com/doc/refman/5.0/en/join.html /** * 会将你的SQL语句里面解析出来的各种token,或者TreeNode,给关联起来,最后组成一颗语法树 * 语法树封装在LogicalPlan中,但是要注意,此时的LogicalPlan,还是Unresolved LogicalPlan */ protected lazy val relations: Parser[LogicalPlan] = ( relation ~ rep1("," ~> relation) ^^ { case r1 ~ joins => joins.foldLeft(r1) { case(lhs, r) => Join(lhs, r, Inner, None) } } | relation )
2、
###org.apache.spark.sql/SQLContext.scala/** * 通过这个Join, left: LogicalPlan,right: LogicalPlan * 意思就是说,将SQL语句的各个部分,通过Spark SQL的规则,组合拼装成一个语法树 */case class Join( left: LogicalPlan, right: LogicalPlan, joinType: JoinType, condition: Option[Expression]) extends BinaryNode {} /** * 实际上,在后面操作DataFrame的时候,在实际真正要执行SQL语句,对数据进行查询,返回结果的时候,会触发SQLContext的executePlan()方法的执行, * 该方法,实际上会返回一个QueryExecution,这个QueryExecution实际上,会触发整个后续的流程 */ protected[sql] def executeSql(sql: String): this.QueryExecution = executePlan(parseSql(sql)) protected[sql] def executePlan(plan: LogicalPlan) = new this.QueryExecution(plan)@DeveloperApi protected[sql] class QueryExecution(val logical: LogicalPlan) { def assertAnalyzed(): Unit = checkAnalysis(analyzed) // 用一个Unresolved LogicalPlan去构造一个QueryExecution的实例对象,那么SQL语句的执行就会一步步触发 // Analyzer的apply()方法执行结束后,得到一个Resolved LogicalPlan lazy val analyzed: LogicalPlan = analyzer(logical) // 会通过CacheManager 执行一个缓存的操作,用一个cacheManager,调用其useCachedData()方法,就是说,如果之前已经缓存过这个执行计划 // 又再次执行的话,那么,其实可以使用缓存中的数据 lazy val withCachedData: LogicalPlan = { assertAnalyzed cacheManager.useCachedData(analyzed) } // 调用Optimizer的apply()方法,针对Resolved LogicalPlan 调用Optimizer,进行优化,获得Optimized LogicalPlan,获得优化后的逻辑执行计划 lazy val optimizedPlan: LogicalPlan = optimizer(withCachedData) // TODO: Don't just pick the first one... // 用SparkPlanner,对Optimizer生成的Optimized LogicalPlan,创建一个SparkPlan lazy val sparkPlan: SparkPlan = { SparkPlan.currentContext.set(self) planner(optimizedPlan).next() } // executedPlan should not be used to initialize any SparkPlan. It should be // only used for execution. // 使用SparkPlan 生成一个可以执行的SparkPlan,此时就是PhysicalPlan,物理执行计划,直接可以执行了,已经绑定到了数据源,而且 // 知道对各个表的join,如何进行join,包括join的时候,spark内部会对小表进行广播 lazy val executedPlan: SparkPlan = prepareForExecution(sparkPlan) /** Internal version of the RDD. Avoids copies and has no schema */ // 最后一步,调用SparkPlan(封装了PhysicalPlan的SparkPlan)的executor()方法,execute()方法,实际上就会去执行物理执行计划 // execute()方法返回的是RDD[Row],就是一个元素类型为Row的RDD lazy val toRdd: RDD[Row] = executedPlan.execute() protected def stringOrError[A](f: => A): String = try f.toString catch { case e: Throwable => e.toString } def simpleString: String = s"""== Physical Plan == |${stringOrError(executedPlan)} """.stripMargin.trim override def toString: String = // TODO previously will output RDD details by run (${stringOrError(toRdd.toDebugString)}) // however, the `toRdd` will cause the real execution, which is not what we want. // We need to think about how to avoid the side effect. s"""== Parsed Logical Plan == |${stringOrError(logical)} |== Analyzed Logical Plan == |${stringOrError(analyzed)} |== Optimized Logical Plan == |${stringOrError(optimizedPlan)} |== Physical Plan == |${stringOrError(executedPlan)} |Code Generation: ${stringOrError(executedPlan.codegenEnabled)} |== RDD == """.stripMargin.trim } /** * Analyzer的所在地,QueryExecution实际执行SQL语句的时候,第一步,就是用之前SqlParser解析出来的纯逻辑的封装了语法树的Unresolved LogicalPlan * 调用Analyzer的apply()方法,将Unresolved LogicalPlan生成一个Resolved LogicalPlan */ @transient protected[sql] lazy val analyzer: Analyzer = new Analyzer(catalog, functionRegistry, caseSensitive = true) { override val extendedResolutionRules = ExtractPythonUdfs :: sources.PreInsertCastAndRename :: Nil }Analyzer这个类在org.apache.spark.sql.catalyst.analysis包下/** * Analyzer的父类是RuleExecutor,调用Analyzer的apply()方法,实际上会调用RuleExecutor的apply()方法,并传入一个Unresolved LogicalPlan */class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Boolean, maxIterations: Int = 100) extends RuleExecutor[LogicalPlan] with HiveTypeCoercion {###org.apache.spark.sql.catalyst.rules/RuleExecutor.scalla /** * 调用这个apply()方法,做了一些事情,总重要的一件事情,就是将这个LogicalPlan与它要查询的数据源绑定起来,从而让 * Unresolved LogicalPlan 变成一个Resolved LogicalPlan */ def apply(plan: TreeType): TreeType = { var curPlan = plan batches.foreach { batch => val batchStartPlan = curPlan var iteration = 1 var lastPlan = curPlan var continue = true}
3、Optimizer
接着看Optimizer,不关心apply()方法,关注它的优化逻辑Optimizer在org.apache.spark.sql.catalyst.optimizeobject DefaultOptimizer extends Optimizer { // 这里的batches是非常重要的,这里封装了每一个Spark SQL版本中,可以对逻辑执行计划执行的优化策略,在这里,重点理解Optimizer的各种优化策略 // 这样,才清楚,Spark SQL 内部是如何对我们写的SQL语句进行优化的,我们可以再编写SQL语句的时候,直接用优化策略建议的方式来编写SQL语句,传递给 // Spark SQL执行的SQL语句,本身就已经是最优的,这样,就可以避免在执行SQL解析的时候,进行大量的Spark SQL内部的优化,这样,在某种程度上,也可以提升性能 // val batches = Batch("Combine Limits", FixedPoint(100), CombineLimits) :: // CombineLimits,就是合并limit语句,比如,你的SQL语句中,有多个limit子句,那么在这里会进行合并,取一个并集就可以了,这样的话 // 在后面SQL执行的时候,limit就执行一次就好,所以,我们再写SQL的时候,尽量就写一个limit Batch("ConstantFolding", FixedPoint(100), NullPropagation,// 针对NULL的优化,尽量避免出现null的情况,否则,null是很容易产生数据倾斜的 ConstantFolding,// 针对常量的优化,在这里直接计算可以获得的常量,所以我们自己对可能出现的常量尽量直接给出 LikeSimplification, // like的简化优化 BooleanSimplification,// boolean的简化优化 SimplifyFilters, SimplifyCasts, SimplifyCaseConversionExpressions, OptimizeIn) :: Batch("Decimal Optimizations", FixedPoint(100), DecimalAggregates) :: Batch("Filter Pushdown", FixedPoint(100), UnionPushdown, // 将union下推,意思是和filter pushdown ,就是说将union where这种子句,下推到子查询中进行,尽量早的执行union操作和where // 操作,避免在外层查询,针对大量的数据,执行where操作 CombineFilters, // 合并filter,就是合并where子句,比如子查询中有针对某个字段的where子句,外层查询中,也有针对同样一个字段的where子句, // 那么此时是可以合并where子句的,只保留一个即可,取并集即可,所以自己写SQL的时候哦,也要注意where的使用,如果针对一个字段,写一次就好 PushPredicateThroughProject, PushPredicateThroughJoin, PushPredicateThroughGenerate, ColumnPruning) :: // ColumnPruning 列剪裁,就是针对你要查询的列进行剪裁,自己写SQL,如果表中有n个字段,但是你只需要查询一个字段, 那么就用select x from 不要使用select * from Batch("LocalRelation", FixedPoint(100), ConvertToLocalRelation) :: Nil}###org.apache.spark.sql/SQLContext.scala /** * 用一些策略,比如说DataSourceStrategy,针对逻辑执行计划,执行进一步的具体化和物化 */ protected[sql] class SparkPlanner extends SparkStrategies { val sparkContext: SparkContext = self.sparkContext val sqlContext: SQLContext = self def codegenEnabled = self.conf.codegenEnabled def numPartitions = self.conf.numShufflePartitions def strategies: Seq[Strategy] = experimental.extraStrategies ++ ( DataSourceStrategy :: DDLStrategy :: TakeOrdered :: HashAggregation :: LeftSemiJoin :: HashJoin :: InMemoryScans :: ParquetOperations :: BasicOperators :: CartesianProduct :: BroadcastNestedLoopJoin :: Nil)}