文件分区和分桶是 Spark SQL 中常用的优化技巧。这两种方法可通过预聚合文件或目录中的数据,来帮助减少数据偏差和数据混洗。按持久表的格式保存时,可对 DataFrame 进行排序、分区或分桶。分区能够基于给定列将文件存储在目录层次结构中,从而优化读取。例如,若按年份对 DataFrame 进行分区:
df.write.format("parquet")
.partitionBy("year")
.option("path", "/data ")
.saveAsTable("taxi")
目录将具有以下结构:
After partitioning the data, when queries are made with filter operators on the partition column, the Spark SQL catalyst optimizer pushes down the partition filter to the datasource. The scan reads only the directories that match the partition filters, reducing disk I/O and data loaded into memory. For example, the following query reads only the files in the year = '2019' 目录。
df.filter("year = '2019')
.groupBy("year").avg("fareamount")
When visualizing the physical plan for this query, you will see Scan PrunedInMemoryFileIndex[ /data/year=2019], PartitionFilters: [ (year = 2019)]。
与分区类似,分桶也会按值分割数据。但分桶是通过对桶值哈希来将数据分发到固定数量的桶内,而分区则会为每个分区列值创建一个目录。表可以分桶存储在多个值上,不论是否进行分区,均可使用分桶操作。如果我们在上方示例中添加存储桶,目录结构将与先前保持相同,但年份目录中的数据文件会按小时分为四个存储桶。
df.write.format("parquet")
.partitionBy("year")
.bucketBy(4,"hour")
.option("path", "/data ")
.saveAsTable("taxi")
在对数据进行分桶之后,对已分桶存储的值进行聚合和连接(宽依赖 transformation)时将不必在分区之间混洗数据,由此能减少网络和磁盘的 I/O。此外,桶过滤器修剪功能还将推送至数据源,以减少磁盘 I/O 和加载到内存中的数据。下方查询按年份将分区过滤器下推到数据源,同时避免数据混洗,以按小时聚合。
df.filter("year = '2019')
.groupBy("hour")
.avg("hour")
分区仅可与过滤查询中的常用列结合使用,并且列值的数量有限,同时还将具有足够的对应数据以在目录中分发文件。对小文件过度使用并行法将导致效率降低,而大文件过少则会损害并行性。当唯一分桶列值的数量很大且该分桶列频繁用于查询时,您将可获得出色的分桶效果。