用于 Spark 的 RAPIDS 加速器不需要用户更改 API,而且它会将支持的 SQL 操作替换为 GPU 操作。为了解已将哪些操作替换为 GPU 操作,您可以调用 explain 方法打印出特定 DataFrame 的物理规划,其中所有以 GPU 为前缀的操作均是在 GPU 上执行。
现在,我们针对第 1 章介绍的某些查询来比较通过 GPU 处理的特定 DataFrame 的物理规划。在下方的物理规划中,DAG 包含 GpuBatchScan、针对 hour 的 GpuFilter 以及针对 hour、fare_amount 和 day_of_week 的 GpuProject(选择列)。使用 CPU 处理时,DAG 包含 FileScan、Filter 和 Project。
// select 和 filter 是窄依赖 transformation
df.select($"hour", $"fare_amount").filter($"hour" === "0.0" ).show(2)
result:
+----+-----------+
|hour|fare_amount|
+----+-----------+
| 0.0| 10.5|
| 0.0| 12.5|
+----+-----------+
df.select($"hour", $"fare_amount").filter($"hour" === "0.0" ).explain
result:
== Physical Plan ==
*(1) GpuColumnarToRow false<
+- !GpuProject [hour#10, fare_amount#9]
+- GpuCoalesceBatches TargetSize(1000000,2147483647)
+- !GpuFilter (gpuisnotnull(hour#10) AND (hour#10 = 0.0))
+- GpuBatchScan[fare_amount#9, hour#10] GpuCSVScan Location:
InMemoryFileIndex[s3a://spark-taxi-dataset/raw-small/train], ReadSchema: struct<fare_amount:double,hour:double>
请注意,原始计划中的大多数节点如何被 GPU 版本替换。RAPIDS 加速器插入数据格式转换节点(如 GpuColumnarToRow 和 GpuRowToColumnar),以在两种处理之间转换:将在 GPU 上执行的节点的列式处理和将在 CPU 上执行的节点的行处理。 为了解某部分查询未在 GPU 上运行的原因,您可将 spark.rapids.sql.explain 配置设置为 true。系统会将此输出记录到驱动日志中,或以交互模式将其显示在屏幕上。