在第 1 章中,我们探讨了 Spark DataFrame 在集群上的执行机制。在本章中,我们将先概述 DataFrame 和 Spark SQL 编程的优势。
在第 1 章中,我们探讨了 Spark DataFrame 在集群上的执行机制。在本章中,我们将先概述 DataFrame 和 Spark SQL 编程的优势。
借助 Spark SQL 经优化的执行引擎,Spark SQL 和 DataFrame API 可实现易用性、空间效率和性能提升。
Spark SQL 使用内存中的列式格式缓存 DataFrame(调用 dataFrame.cache 时),此格式经优化已能仅扫描必要列、自动调整压缩、尽量减少内存占用和 JVM 垃圾收集。
Spark SQL 矢量化 Parquet 和 ORC 读取器按列分批进行解压缩和解码,使读取速度大约提升至原来的九倍。
Spark SQL 的 Catalyst 优化器可处理逻辑优化和物理规划,同时支持基于规则和基于成本的优化。可行情况下,Spark SQL Whole-Stage Java Code Generation 技术通过为 SQL 查询中的一组运算符按字节码生成单个优化函数,从而优化 CPU 使用率。
在典型的机器学习 (ML) 或深度学习 (DL) 项目中,数据准备和探索占用了分析流程 60% 至 80% 的工作量。为构建 ML 模型,您必须清理、提取、探索和测试数据集,以便找到最有助于提升模型预测准确率的重要特征。为便于解释说明,我们将使用 Spark SQL 探索“出租车”数据集,以分析哪些特征可能有助于预测出租车费用。
以下代码展示了将 CSV 文件中的数据加载到 Spark Dataframe 中的具体方法,同时指定了要加载到 DataFrame 中的数据源和模式(如第 1 章所述)。将 DataFrame 注册为 SQL 临时视图之后,我们可在 SparkSession 上使用 SQL 函数来运行 SQL 查询,而此查询将以 DataFrame 的形式返回结果。缓存 DataFrame,以便 Spark 不必为每个查询重新加载它。另外,Spark 可以使用列式格式将 DataFrame 或表格缓存到内存中,从而减少内存占用并提高性能。
// 如第 1 章所述加载数据
val file = "/data/taxi_small.csv"
val df = spark.read.option("inferSchema", "false")
.option("header", true).schema(schema).csv(file)
// 使用列式格式将 DataFrame 缓存到内存中
df.cache
// 为 Spark SQL 创建 DataFrame 表视图
df.createOrReplaceTempView("taxi")
// 使用列式格式将 taxi 表缓存到内存中
spark.catalog.cacheTable("taxi")
现在,我们可以使用 Spark SQL 并提出一些问题来探索可能影响出租车费用的因素,例如:“一天中每小时的平均车费是多少?”
%sql
select hour, avg(fare_amount)
from taxi
group by hour order by hour
通过使用 Zeppelin 或 Jupyter 等笔记本,我们可以按图形格式显示 SQL 结果。
以下是使用 DataFrame API 进行的相同查询:
df.groupBy("hour").avg("fare_amount")
.orderBy("hour").show(5)
result:
+----+------------------+
|hour| avg(fare_amount)|
+----+------------------+
| 0.0|11.083333333333334|
| 1.0|22.581632653061224|
| 2.0|11.370820668693009|
| 3.0|13.873989218328841|
| 4.0| 14.57204433497537|
+----+------------------+
与平均行程距离对应的平均车费是多少?
%sql
select trip_distance,avg(trip_distance), avg(fare_amount)
from taxi
group by trip_distance order by avg(trip_distance) desc
一天中每小时的平均车费和平均行程距离是多少?
%sql
select hour, avg(fare_amount), avg(trip_distance)
from taxi
group by hour order by hour
不同费率代码所对应的平均车费和平均行程距离是多少?
%sql
select hour, avg(fare_amount), avg(trip_distance)
from taxi
group by rate_code order by rate_code
一周中每天的平均车费和平均行程距离是多少?
%sql
select day_of_week, avg(fare_amount), avg(trip_distance)
from taxi
group by day_of_week order by day_of_week
您可以使用 Spark 的“SQL”选项卡查看查询执行信息,例如查询计划详细信息和 SQL 指标。单击查询链接即可显示作业的 DAG。
单击 DAG 中的“+details”(+ 详细信息),即可显示此阶段的详细信息
单击底部的“Details”(详细信息)链接,即能以文本格式显示逻辑计划和物理规划。
在查询计划详细信息中,您可以看到:
使用 Spark SQL ANALYZE TABLE 表格名 COMPUTE STATISTICS,以利用 Catalyst 规划器内基于成本的优化方式。
“Jobs”(作业)选项卡概要页面显示了作业信息总览,例如所有作业的状态、持续时间和进度,以及整个事件时间线。以下是一些要检查的指标:
“Stage”(阶段)选项卡显示了所有任务的概要指标。您可使用这些指标来识别执行程序或任务分发存在的问题。以下是一些要查看的指标:
“Storage”(存储)选项卡显示了已缓存或持久化保存到磁盘的 DataFrame,以及内存大小和磁盘大小信息。您可以使用“Storage”(存储)选项卡查看已缓存的 DataFrame 是否与内存匹配。如需重复使用 DataFrame 且其大小与内存匹配,则缓存此 DataFrame 可加快执行速度。
“Executors”(执行程序)选项卡显示了为应用程序创建的执行程序的内存、磁盘和任务使用情况信息概要。您可使用此选项卡中的以下选项来确认您的应用程序具有所需的资源量:
文件分区和分桶是 Spark SQL 中常用的优化技巧。这两种方法可通过预聚合文件或目录中的数据,来帮助减少数据偏差和数据混洗。按持久表的格式保存时,可对 DataFrame 进行排序、分区或分桶。分区能够基于给定列将文件存储在目录层次结构中,从而优化读取。例如,若按年份对 DataFrame 进行分区:
df.write.format("parquet")
.partitionBy("year")
.option("path", "/data ")
.saveAsTable("taxi")
目录将具有以下结构:
After partitioning the data, when queries are made with filter operators on the partition column, the Spark SQL catalyst optimizer pushes down the partition filter to the datasource. The scan reads only the directories that match the partition filters, reducing disk I/O and data loaded into memory. For example, the following query reads only the files in the year = '2019' 目录。
df.filter("year = '2019')
.groupBy("year").avg("fareamount")
When visualizing the physical plan for this query, you will see Scan PrunedInMemoryFileIndex[ /data/year=2019], PartitionFilters: [ (year = 2019)]。
与分区类似,分桶也会按值分割数据。但分桶是通过对桶值哈希来将数据分发到固定数量的桶内,而分区则会为每个分区列值创建一个目录。表可以分桶存储在多个值上,不论是否进行分区,均可使用分桶操作。如果我们在上方示例中添加存储桶,目录结构将与先前保持相同,但年份目录中的数据文件会按小时分为四个存储桶。
df.write.format("parquet")
.partitionBy("year")
.bucketBy(4,"hour")
.option("path", "/data ")
.saveAsTable("taxi")
在对数据进行分桶之后,对已分桶存储的值进行聚合和连接(宽依赖 transformation)时将不必在分区之间混洗数据,由此能减少网络和磁盘的 I/O。此外,桶过滤器修剪功能还将推送至数据源,以减少磁盘 I/O 和加载到内存中的数据。下方查询按年份将分区过滤器下推到数据源,同时避免数据混洗,以按小时聚合。
df.filter("year = '2019')
.groupBy("hour")
.avg("hour")
分区仅可与过滤查询中的常用列结合使用,并且列值的数量有限,同时还将具有足够的对应数据以在目录中分发文件。对小文件过度使用并行法将导致效率降低,而大文件过少则会损害并行性。当唯一分桶列值的数量很大且该分桶列频繁用于查询时,您将可获得出色的分桶效果。
在本章中,我们探讨了如何在 Spark SQL 中使用表格数据。上文的代码示例可以重复使用,作为使用 Spark SQL 处理数据的基础。在下一章中,我们将使用相同的数据及 DataFrame 来预测出租车费用。