-
Notifications
You must be signed in to change notification settings - Fork 228
[AURON #2378] Support runtime filters in native Iceberg scan #2379
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
e22b0d1
ceaae88
261fe45
edf24ab
e0edb9a
8ed52f6
e00951c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,11 +19,12 @@ package org.apache.spark.sql.auron.iceberg | |
| import scala.collection.JavaConverters._ | ||
| import scala.util.control.NonFatal | ||
|
|
||
| import org.apache.commons.lang3.reflect.MethodUtils | ||
| import org.apache.iceberg.{AddedRowsScanTask, ChangelogOperation, ChangelogScanTask, FileFormat, FileScanTask, MetadataColumns, ScanTask} | ||
| import org.apache.iceberg.expressions.{And => IcebergAnd, BoundPredicate, Expression => IcebergExpression, Not => IcebergNot, Or => IcebergOr, UnboundPredicate} | ||
| import org.apache.iceberg.spark.source.AuronIcebergSourceUtil | ||
| import org.apache.spark.internal.Logging | ||
| import org.apache.spark.sql.auron.NativeConverters | ||
| import org.apache.spark.sql.auron.{NativeConverters, Shims} | ||
| import org.apache.spark.sql.catalyst.expressions.{And => SparkAnd, AttributeReference, EqualTo, Expression => SparkExpression, GreaterThan, GreaterThanOrEqual, In, IsNaN, IsNotNull, IsNull, LessThan, LessThanOrEqual, Literal, Not => SparkNot, Or => SparkOr} | ||
| import org.apache.spark.sql.catalyst.trees.TreeNodeTag | ||
| import org.apache.spark.sql.connector.read.{InputPartition, Scan} | ||
|
|
@@ -55,6 +56,8 @@ final case class IcebergScanPlan( | |
| object IcebergScanSupport extends Logging { | ||
| private val scanPlanTag: TreeNodeTag[Option[IcebergScanPlan]] = TreeNodeTag( | ||
| "auron.iceberg.scan.plan") | ||
| private val runtimeFilteredScanPlanTag: TreeNodeTag[Option[IcebergScanPlan]] = TreeNodeTag( | ||
| "auron.iceberg.runtime.filtered.scan.plan") | ||
|
|
||
| private val SparkChangelogScanClassName = | ||
| "org.apache.iceberg.spark.source.SparkChangelogScan" | ||
|
|
@@ -82,35 +85,54 @@ object IcebergScanSupport extends Logging { | |
| } | ||
| } | ||
|
|
||
| def plan(exec: BatchScanExec): Option[IcebergScanPlan] = { | ||
| exec.getTagValue(scanPlanTag) match { | ||
| def plan(exec: BatchScanExec, useRuntimeFilters: Boolean = false): Option[IcebergScanPlan] = { | ||
| val tag = | ||
| if (useRuntimeFilters && exec.runtimeFilters.nonEmpty) { | ||
| runtimeFilteredScanPlanTag | ||
| } else { | ||
| scanPlanTag | ||
| } | ||
| exec.getTagValue(tag) match { | ||
| case Some(cached) => cached | ||
| case None => | ||
| val planned = planUncached(exec) | ||
| exec.setTagValue(scanPlanTag, planned) | ||
| val planned = planUncached(exec, useRuntimeFilters) | ||
| exec.setTagValue(tag, planned) | ||
| planned | ||
| } | ||
| } | ||
|
|
||
| private def planUncached(exec: BatchScanExec): Option[IcebergScanPlan] = { | ||
| def withRuntimeFilters( | ||
| exec: BatchScanExec, | ||
| runtimeFilters: Seq[SparkExpression]): BatchScanExec = { | ||
| if (exec.runtimeFilters == runtimeFilters) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This guard Two things follow from that. First, all five Is this intentional groundwork for a future path that builds the node with filters different from |
||
| exec | ||
| } else { | ||
| Shims.get.copyBatchScanExecWithRuntimeFilters(exec, runtimeFilters) | ||
| } | ||
| } | ||
|
|
||
| private def planUncached( | ||
| exec: BatchScanExec, | ||
| useRuntimeFilters: Boolean): Option[IcebergScanPlan] = { | ||
| val scan = exec.scan | ||
| val scanClassName = scan.getClass.getName | ||
| // Only handle Iceberg scans; other sources must stay on Spark's path. | ||
| if (scanClassName == SparkChangelogScanClassName) { | ||
| return planChangelogScan(exec, scan) | ||
| return planChangelogScan(exec, scan, useRuntimeFilters) | ||
| } | ||
|
|
||
| if (!AuronIcebergSourceUtil.getClassOfSparkBatchQueryScan.isInstance(scan)) { | ||
| return None | ||
| } | ||
|
|
||
| planFileScan(exec, scan, scanClassName) | ||
| planFileScan(exec, scan, scanClassName, useRuntimeFilters) | ||
| } | ||
|
|
||
| private def planFileScan( | ||
| exec: BatchScanExec, | ||
| scan: Scan, | ||
| scanClassName: String): Option[IcebergScanPlan] = { | ||
| scanClassName: String, | ||
| useRuntimeFilters: Boolean): Option[IcebergScanPlan] = { | ||
| val readSchema = scan.readSchema | ||
| val schemas = supportedSchemas(readSchema, isChangelogScan = false) | ||
| if (schemas.isEmpty) { | ||
|
|
@@ -143,7 +165,7 @@ object IcebergScanSupport extends Logging { | |
| missingFieldIds.isEmpty, | ||
| s"Missing Iceberg field ids for columns: ${missingFieldIds.mkString(", ")}") | ||
|
|
||
| val partitions = inputPartitions(exec) | ||
| val partitions = inputPartitions(exec, useRuntimeFilters) | ||
| // Empty scan (e.g. empty table) should still build a plan to return no rows. | ||
| if (partitions.isEmpty) { | ||
| logWarning(s"Native Iceberg scan planned with empty partitions for $scanClassName.") | ||
|
|
@@ -203,15 +225,18 @@ object IcebergScanSupport extends Logging { | |
| fieldIdsByName)) | ||
| } | ||
|
|
||
| private def planChangelogScan(exec: BatchScanExec, scan: Scan): Option[IcebergScanPlan] = { | ||
| private def planChangelogScan( | ||
| exec: BatchScanExec, | ||
| scan: Scan, | ||
| useRuntimeFilters: Boolean): Option[IcebergScanPlan] = { | ||
| val readSchema = scan.readSchema | ||
| val schemas = supportedSchemas(readSchema, isChangelogScan = true) | ||
| if (schemas.isEmpty) { | ||
| return None | ||
| } | ||
| val (fileSchema, partitionSchema) = schemas.get | ||
|
|
||
| val partitions = inputPartitions(exec) | ||
| val partitions = inputPartitions(exec, useRuntimeFilters) | ||
| if (partitions.isEmpty) { | ||
| return Some( | ||
| IcebergScanPlan( | ||
|
|
@@ -326,7 +351,16 @@ object IcebergScanSupport extends Logging { | |
| private def deletesEmpty(deletes: java.util.List[_]): Boolean = | ||
| deletes == null || deletes.isEmpty | ||
|
|
||
| private def inputPartitions(exec: BatchScanExec): Seq[InputPartition] = { | ||
| private def inputPartitions( | ||
| exec: BatchScanExec, | ||
| useRuntimeFilters: Boolean): Seq[InputPartition] = { | ||
| if (useRuntimeFilters) { | ||
| runtimeFilteredPartitions(exec) match { | ||
| case Some(partitions) => return partitions | ||
| case None => | ||
| } | ||
| } | ||
|
|
||
| // Prefer DataSource V2 batch API; if not available, fallback to exec methods via reflection. | ||
| val fromBatch = | ||
| try { | ||
|
|
@@ -382,6 +416,40 @@ object IcebergScanSupport extends Logging { | |
| } | ||
| } | ||
|
|
||
| private def runtimeFilteredPartitions(exec: BatchScanExec): Option[Seq[InputPartition]] = { | ||
| if (exec.runtimeFilters.isEmpty) { | ||
| return None | ||
| } | ||
|
|
||
| try { | ||
| MethodUtils.invokeMethod(exec, true, "prepare") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| MethodUtils.invokeMethod(exec, true, "waitForSubqueries") | ||
| invokeDeclaredMethod(exec, "filteredPartitions") match { | ||
| case Some(seq: scala.collection.Seq[_]) => | ||
| Some(flattenPartitions(seq)) | ||
| case _ => | ||
| None | ||
| } | ||
| } catch { | ||
| case NonFatal(t) => | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This |
||
| logWarning( | ||
| s"Failed to obtain runtime-filtered input partitions for ${exec.getClass.getName}.", | ||
| t) | ||
| None | ||
| } | ||
| } | ||
|
|
||
| private def flattenPartitions(seq: scala.collection.Seq[_]): Seq[InputPartition] = { | ||
| seq.flatMap { | ||
| case partition: InputPartition => | ||
| Seq(partition) | ||
| case nested: scala.collection.Seq[_] => | ||
| flattenPartitions(nested) | ||
| case _ => | ||
| Seq.empty | ||
| }.toSeq | ||
| } | ||
|
|
||
| private case class IcebergPartitionView(tasks: Seq[ScanTask]) | ||
|
|
||
| private def icebergPartition(partition: InputPartition): Option[IcebergPartitionView] = { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -31,9 +31,9 @@ import org.apache.spark.broadcast.Broadcast | |
| import org.apache.spark.internal.Logging | ||
| import org.apache.spark.sql.SparkSession | ||
| import org.apache.spark.sql.auron.{EmptyNativeRDD, NativeConverters, NativeHelper, NativeRDD, NativeSupports, Shims} | ||
| import org.apache.spark.sql.auron.iceberg.{IcebergNativeScanTask, IcebergScanPlan} | ||
| import org.apache.spark.sql.auron.iceberg.{IcebergNativeScanTask, IcebergScanPlan, IcebergScanSupport} | ||
| import org.apache.spark.sql.catalyst.InternalRow | ||
| import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, Literal} | ||
| import org.apache.spark.sql.catalyst.expressions.{Expression, GenericInternalRow, Literal} | ||
| import org.apache.spark.sql.catalyst.plans.physical.SinglePartition | ||
| import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan, SQLExecution} | ||
| import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile} | ||
|
|
@@ -47,7 +47,10 @@ import org.apache.auron.{protobuf => pb} | |
| import org.apache.auron.jni.JniBridge | ||
| import org.apache.auron.metric.SparkMetricNode | ||
|
|
||
| case class NativeIcebergTableScanExec(basedScan: BatchScanExec, plan: IcebergScanPlan) | ||
| case class NativeIcebergTableScanExec( | ||
| basedScan: BatchScanExec, | ||
| staticPlan: IcebergScanPlan, | ||
| runtimeFilters: Seq[Expression]) | ||
| extends LeafExecNode | ||
| with NativeSupports | ||
| with Logging { | ||
|
|
@@ -60,6 +63,15 @@ case class NativeIcebergTableScanExec(basedScan: BatchScanExec, plan: IcebergSca | |
| override val output = basedScan.output | ||
| override val outputPartitioning = basedScan.outputPartitioning | ||
|
|
||
| private lazy val plan: IcebergScanPlan = { | ||
| if (runtimeFilters.nonEmpty) { | ||
| val filteredScan = IcebergScanSupport.withRuntimeFilters(basedScan, runtimeFilters) | ||
| IcebergScanSupport.plan(filteredScan, useRuntimeFilters = true).getOrElse(staticPlan) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When runtime-filtered planning returns |
||
| } else { | ||
| staticPlan | ||
| } | ||
| } | ||
|
|
||
| private lazy val fileSchema: StructType = plan.fileSchema | ||
| private lazy val partitionSchema: StructType = plan.partitionSchema | ||
| private lazy val projectableSchema: StructType = | ||
|
|
@@ -213,8 +225,29 @@ case class NativeIcebergTableScanExec(basedScan: BatchScanExec, plan: IcebergSca | |
|
|
||
| override val nodeName: String = "NativeIcebergTableScan" | ||
|
|
||
| // Delegate canonicalization to the original scan to keep plan equivalence checks consistent. | ||
| override protected def doCanonicalize(): SparkPlan = basedScan.canonicalized | ||
| override def simpleString(maxFields: Int): String = { | ||
| val runtimeFiltersString = | ||
| if (runtimeFilters.nonEmpty) { | ||
| s", runtimeFilters=${runtimeFilters.mkString("[", ", ", "]")}" | ||
| } else { | ||
| "" | ||
| } | ||
| s"$nodeName (${basedScan.simpleString(maxFields)}$runtimeFiltersString)" | ||
| } | ||
|
|
||
| override def verboseStringWithOperatorId(): String = { | ||
| s""" | ||
| |$formattedNodeName | ||
| |Output: ${output.mkString("[", ", ", "]")} | ||
| |${basedScan.scan.description()} | ||
| |RuntimeFilters: ${runtimeFilters.mkString("[", ", ", "]")} | ||
| |""".stripMargin | ||
| } | ||
|
|
||
| // Keep canonicalization aligned with Spark's BatchScanExec, but first make sure it sees | ||
| // the top-level runtime filters carried by this native scan. | ||
| override protected def doCanonicalize(): SparkPlan = | ||
| IcebergScanSupport.withRuntimeFilters(basedScan, runtimeFilters).canonicalized | ||
|
|
||
| private def buildFileSizes(): Map[String, Long] = { | ||
| // Map file path to full file size; tasks may split a file into multiple ranges. | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This groups 4.1 with 3.5/4.0 on the assumption that Spark 4.1's
BatchScanExecconstructor is still(output, scan, runtimeFilters, ordering, table, spjParams). The shims module compiles for 4.1 even though iceberg doesn't build there, so if 4.1 changed that constructor the 4.1 profile would fail to compile rather than fail a test. Was the 4.1 branch actually built against a 4.1 profile, or is this optimistic grouping ahead of 4.1 GA? If it hasn't been compiled against 4.1 yet, would it be safer to split 4.1 into its own branch (or drop it from the group) until the signature is confirmed?