您可通过调用 explain(“formatted”) 方法来查看 DataFrame 的格式化物理规划。在下方的物理规划中,df2 的 DAG 由 Scan csv 文件、对 day_of_week 的 Filter 以及针对 hour、fare_amount 和 day_of_week 的 Project(选择列)组成。
val df = spark.read.option("inferSchema", "false") .option("header", true).schema(schema).csv(file)
val df2 = df.select($"hour", $"fare_amount", $"day_of_week").filter($"day_of_week" === "6.0" )
df2.show(3)
result:
+----+-----------+-----------+
|hour|fare_amount|day_of_week|
+----+-----------+-----------+
|10.0| 11.5| 6.0|
|10.0| 5.5| 6.0|
|10.0| 13.0| 6.0|
+----+-----------+-----------+
df2.explain(“formatted”)
result:
== Physical Plan ==
* Project (3)
+- * Filter (2)
+- Scan csv (1)
(1) Scan csv
Location: [dbfs:/FileStore/tables/taxi_tsmall.csv]
Output [3]: [fare_amount#143, hour#144, day_of_week#148]
PushedFilters: [IsNotNull(day_of_week), EqualTo(day_of_week,6.0)]
(2) Filter [codegen id : 1]
Input [3]: [fare_amount#143, hour#144, day_of_week#148]
Condition : (isnotnull(day_of_week#148) AND (day_of_week#148 = 6.0))
(3) Project [codegen id : 1]
Output [3]: [hour#144, fare_amount#143, day_of_week#148]
Input [3]: [fare_amount#143, hour#144, day_of_week#148]
您可以在 Web UI 的“SQL”选项卡上查看有关 Catalyst 所生成计划的更多详细信息。单击查询描述链接即可显示 DAG 和查询详情。
在下方代码的解释行之后,我们看到 df3 的物理规划由 Scan、Filter、Project、HashAggregate、Exchange 和 HashAggregate 组成。Exchange 是由 groupBy transformation 操作引起的 shuffle 操作。在对 Exchange 中的数据进行混洗之前,Spark 会对每个分区执行哈希聚合。Exchange 之后是对先前的子聚合执行哈希聚合。请注意,若已缓存 df2,我们将在此 DAG 中进行内存扫描,而非文件扫描。
val df3 = df2.groupBy("hour").count
df3.orderBy(asc("hour"))show(5)
result:
+----+-----+
|hour|count|
+----+-----+
| 0.0| 12|
| 1.0| 47|
| 2.0| 658|
| 3.0| 742|
| 4.0| 812|
+----+-----+
df3.explain
result:
== Physical Plan ==
* HashAggregate (6)
+- Exchange (5)
+- * HashAggregate (4)
+- * Project (3)
+- * Filter (2)
+- Scan csv (1)
(1) Scan csv
Output [2]: [hour, day_of_week]
(2) Filter [codegen id : 1]
Input [2]: [hour, day_of_week]
Condition : (isnotnull(day_of_week) AND (day_of_week = 6.0))
(3) Project [codegen id : 1]
Output [1]: [hour]
Input [2]: [hour, day_of_week]
(4) HashAggregate [codegen id : 1]
Input [1]: [hour]
Functions [1]: [partial_count(1) AS count]
Aggregate Attributes [1]: [count]
Results [2]: [hour, count]
(5) Exchange
Input [2]: [hour, count]
Arguments: hashpartitioning(hour, 200), true, [id=]
(6) HashAggregate [codegen id : 2]
Input [2]: [hour, count]
Keys [1]: [hour]
Functions [1]: [finalmerge_count(merge count) AS count(1)]
Aggregate Attributes [1]: [count(1)]
Results [2]: [hour, count(1) AS count]
单击此查询的 SQL 选项卡链接将显示该作业的 DAG。
选中“Expand details”(展开详细信息)复选框,即可显示每个阶段的详细信息。第一个代码块 WholeStageCodegen 将多个运算符(scan csv、filter、project 和 HashAggregate)共同编译为单个 Java 函数,以提高性能。下方屏幕中显示了行数和溢出大小等指标。
第二个代码块标题为 Exchange,其显示了 shuffle Exchange 所使用的指标,具体包括写入的 shuffle 记录数量和数据大小总计。