From e22b0d1813ad5fac19d05c785caf0d503b1b5425 Mon Sep 17 00:00:00 2001 From: linfeng Date: Thu, 2 Jul 2026 23:11:18 +0800 Subject: [PATCH 01/11] support iceberg scan runtime filter --- .../iceberg/IcebergConvertProvider.scala | 3 +- .../auron/iceberg/IcebergScanSupport.scala | 82 +++++++++++++++++-- .../plan/NativeIcebergTableScanExec.scala | 28 ++++++- .../AuronIcebergIntegrationSuite.scala | 35 +++++++- 4 files changed, 134 insertions(+), 14 deletions(-) diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala index 6a9c3daa8..3da5b3a3c 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala @@ -55,7 +55,8 @@ class IcebergConvertProvider extends AuronConvertProvider with Logging { case e: BatchScanExec => IcebergScanSupport.plan(e) match { case Some(plan) => - AuronConverters.addRenameColumnsExec(NativeIcebergTableScanExec(e, plan)) + AuronConverters.addRenameColumnsExec( + NativeIcebergTableScanExec(e, plan, e.runtimeFilters)) case None => IcebergScanSupport.fallbackReason(e) match { case Some(reason) => throw new AssertionError(reason) diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala index 3aa85b2de..2275fad09 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.auron.iceberg import scala.collection.JavaConverters._ import scala.util.control.NonFatal +import org.apache.commons.lang3.reflect.MethodUtils import org.apache.iceberg.{AddedRowsScanTask, ChangelogOperation, ChangelogScanTask, FileFormat, FileScanTask, MetadataColumns, ScanTask} import org.apache.iceberg.expressions.{And => IcebergAnd, BoundPredicate, Expression => IcebergExpression, Not => IcebergNot, Or => IcebergOr, UnboundPredicate} import org.apache.iceberg.spark.source.AuronIcebergSourceUtil @@ -55,6 +56,8 @@ final case class IcebergScanPlan( object IcebergScanSupport extends Logging { private val scanPlanTag: TreeNodeTag[Option[IcebergScanPlan]] = TreeNodeTag( "auron.iceberg.scan.plan") + private val runtimeFilteredScanPlanTag: TreeNodeTag[Option[IcebergScanPlan]] = TreeNodeTag( + "auron.iceberg.runtime.filtered.scan.plan") private val SparkChangelogScanClassName = "org.apache.iceberg.spark.source.SparkChangelogScan" @@ -82,17 +85,41 @@ object IcebergScanSupport extends Logging { } } - def plan(exec: BatchScanExec): Option[IcebergScanPlan] = { - exec.getTagValue(scanPlanTag) match { + def plan(exec: BatchScanExec, useRuntimeFilters: Boolean = false): Option[IcebergScanPlan] = { + val tag = if (useRuntimeFilters) runtimeFilteredScanPlanTag else scanPlanTag + exec.getTagValue(tag) match { case Some(cached) => cached case None => - val planned = planUncached(exec) - exec.setTagValue(scanPlanTag, planned) + val planned = planUncached(exec, useRuntimeFilters) + exec.setTagValue(tag, planned) planned } } - private def planUncached(exec: BatchScanExec): Option[IcebergScanPlan] = { + def withRuntimeFilters( + exec: BatchScanExec, + runtimeFilters: Seq[SparkExpression]): BatchScanExec = { + val params = exec.productIterator.toArray + assert(params.length >= 3, s"Unexpected BatchScanExec shape: ${exec.getClass.getName}") + params(2) = runtimeFilters + try { + exec.getClass.getMethods + .find(method => method.getName == "copy" && method.getParameterCount == params.length) + .getOrElse { + throw new NoSuchMethodException( + s"Cannot find compatible BatchScanExec.copy with ${params.length} parameters.") + } + .invoke(exec, params.map(_.asInstanceOf[AnyRef]): _*) + .asInstanceOf[BatchScanExec] + } catch { + case NonFatal(t) => + throw new IllegalStateException("Failed to copy BatchScanExec with runtime filters.", t) + } + } + + private def planUncached( + exec: BatchScanExec, + useRuntimeFilters: Boolean): Option[IcebergScanPlan] = { val scan = exec.scan val scanClassName = scan.getClass.getName // Only handle Iceberg scans; other sources must stay on Spark's path. @@ -104,13 +131,14 @@ object IcebergScanSupport extends Logging { return None } - planFileScan(exec, scan, scanClassName) + planFileScan(exec, scan, scanClassName, useRuntimeFilters) } private def planFileScan( exec: BatchScanExec, scan: Scan, - scanClassName: String): Option[IcebergScanPlan] = { + scanClassName: String, + useRuntimeFilters: Boolean): Option[IcebergScanPlan] = { val readSchema = scan.readSchema val schemas = supportedSchemas(readSchema, isChangelogScan = false) if (schemas.isEmpty) { @@ -143,7 +171,7 @@ object IcebergScanSupport extends Logging { missingFieldIds.isEmpty, s"Missing Iceberg field ids for columns: ${missingFieldIds.mkString(", ")}") - val partitions = inputPartitions(exec) + val partitions = inputPartitions(exec, useRuntimeFilters) // Empty scan (e.g. empty table) should still build a plan to return no rows. if (partitions.isEmpty) { logWarning(s"Native Iceberg scan planned with empty partitions for $scanClassName.") @@ -326,7 +354,13 @@ object IcebergScanSupport extends Logging { private def deletesEmpty(deletes: java.util.List[_]): Boolean = deletes == null || deletes.isEmpty - private def inputPartitions(exec: BatchScanExec): Seq[InputPartition] = { + private def inputPartitions( + exec: BatchScanExec, + useRuntimeFilters: Boolean = false): Seq[InputPartition] = { + if (useRuntimeFilters) { + return runtimeFilteredPartitions(exec) + } + // Prefer DataSource V2 batch API; if not available, fallback to exec methods via reflection. val fromBatch = try { @@ -382,6 +416,36 @@ object IcebergScanSupport extends Logging { } } + private def runtimeFilteredPartitions(exec: BatchScanExec): Seq[InputPartition] = { + try { + MethodUtils.invokeMethod(exec, true, "prepare") + MethodUtils.invokeMethod(exec, true, "waitForSubqueries") + invokeDeclaredMethod(exec, "filteredPartitions") match { + case Some(seq: scala.collection.Seq[_]) => + flattenPartitions(seq) + case _ => + Seq.empty + } + } catch { + case NonFatal(t) => + logWarning( + s"Failed to obtain runtime-filtered input partitions for ${exec.getClass.getName}.", + t) + Seq.empty + } + } + + private def flattenPartitions(seq: scala.collection.Seq[_]): Seq[InputPartition] = { + seq.flatMap { + case partition: InputPartition => + Seq(partition) + case nested: scala.collection.Seq[_] => + flattenPartitions(nested) + case _ => + Seq.empty + }.toSeq + } + private case class IcebergPartitionView(tasks: Seq[ScanTask]) private def icebergPartition(partition: InputPartition): Option[IcebergPartitionView] = { diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala index 3dfa08b65..76b13eab2 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala @@ -31,9 +31,9 @@ import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.auron.{EmptyNativeRDD, NativeConverters, NativeHelper, NativeRDD, NativeSupports, Shims} -import org.apache.spark.sql.auron.iceberg.{IcebergNativeScanTask, IcebergScanPlan} +import org.apache.spark.sql.auron.iceberg.{IcebergNativeScanTask, IcebergScanPlan, IcebergScanSupport} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, Literal} +import org.apache.spark.sql.catalyst.expressions.{Expression, GenericInternalRow, Literal} import org.apache.spark.sql.catalyst.plans.physical.SinglePartition import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan, SQLExecution} import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile} @@ -47,7 +47,10 @@ import org.apache.auron.{protobuf => pb} import org.apache.auron.jni.JniBridge import org.apache.auron.metric.SparkMetricNode -case class NativeIcebergTableScanExec(basedScan: BatchScanExec, plan: IcebergScanPlan) +case class NativeIcebergTableScanExec( + basedScan: BatchScanExec, + staticPlan: IcebergScanPlan, + runtimeFilters: Seq[Expression]) extends LeafExecNode with NativeSupports with Logging { @@ -60,6 +63,15 @@ case class NativeIcebergTableScanExec(basedScan: BatchScanExec, plan: IcebergSca override val output = basedScan.output override val outputPartitioning = basedScan.outputPartitioning + private lazy val plan: IcebergScanPlan = { + if (runtimeFilters.nonEmpty) { + val filteredScan = IcebergScanSupport.withRuntimeFilters(basedScan, runtimeFilters) + IcebergScanSupport.plan(filteredScan, useRuntimeFilters = true).getOrElse(staticPlan) + } else { + staticPlan + } + } + private lazy val fileSchema: StructType = plan.fileSchema private lazy val partitionSchema: StructType = plan.partitionSchema private lazy val projectableSchema: StructType = @@ -213,6 +225,16 @@ case class NativeIcebergTableScanExec(basedScan: BatchScanExec, plan: IcebergSca override val nodeName: String = "NativeIcebergTableScan" + override def simpleString(maxFields: Int): String = { + val runtimeFilterString = + if (runtimeFilters.isEmpty) { + "" + } else { + s", runtimeFilters=[${runtimeFilters.map(_.toString).mkString(", ")}]" + } + s"$nodeName$output$runtimeFilterString" + } + // Delegate canonicalization to the original scan to keep plan equivalence checks consistent. override protected def doCanonicalize(): SparkPlan = basedScan.canonicalized diff --git a/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala b/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala index 1142b6e03..c4a3e1bfd 100644 --- a/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala +++ b/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala @@ -16,7 +16,7 @@ */ package org.apache.auron.iceberg -import java.util.UUID +import java.util.{Locale, UUID} import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit @@ -32,6 +32,7 @@ import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.auron.iceberg.IcebergScanSupport import org.apache.spark.sql.catalyst.trees.TreeNodeTag import org.apache.spark.sql.execution.ExplainUtils.collectFirst +import org.apache.spark.sql.execution.FormattedMode import org.apache.spark.sql.execution.auron.plan.NativeIcebergTableScanExec import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates @@ -175,6 +176,38 @@ class AuronIcebergIntegrationSuite } } + test("iceberg native scan preserves dynamic pruning runtime filters") { + withTable("local.db.t_dpp_fact", "local.db.t_dpp_dim") { + sql(""" + |create table local.db.t_dpp_fact (id int, v string, p int) + |using iceberg + |partitioned by (p) + |""".stripMargin) + sql("insert into local.db.t_dpp_fact values (1, 'a', 1), (2, 'b', 2), (3, 'c', 3)") + sql("create table local.db.t_dpp_dim using iceberg as select 2 as p") + + withSQLConf( + "spark.auron.enable" -> "true", + "spark.auron.enable.iceberg.scan" -> "true", + "spark.sql.optimizer.dynamicPartitionPruning.enabled" -> "true", + "spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly" -> "false", + "spark.sql.autoBroadcastJoinThreshold" -> "-1") { + val df = sql(""" + |select f.id, f.v, f.p + |from local.db.t_dpp_fact f + |join local.db.t_dpp_dim d + |on f.p = d.p + |""".stripMargin) + + checkAnswer(df, Seq(Row(2, "b", 2))) + + val explain = df.queryExecution.explainString(FormattedMode) + assert(explain.contains("NativeIcebergTableScan"), explain) + assert(explain.toLowerCase(Locale.ROOT).contains("dynamicpruning"), explain) + } + } + } + test("iceberg native scan is applied for ORC COW table") { withTable("local.db.t_orc") { sql(""" From ceaae888d58219e1bdecca9a77f875fc152a3062 Mon Sep 17 00:00:00 2001 From: linfeng <33561138+lyne7-sc@users.noreply.github.com> Date: Fri, 3 Jul 2026 11:05:47 +0800 Subject: [PATCH 02/11] refactor IcebergScanSupport to use runtime filters --- .../auron/iceberg/IcebergScanSupport.scala | 35 +++++++++++++------ 1 file changed, 25 insertions(+), 10 deletions(-) diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala index 2275fad09..ca64c96f2 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala @@ -86,7 +86,12 @@ object IcebergScanSupport extends Logging { } def plan(exec: BatchScanExec, useRuntimeFilters: Boolean = false): Option[IcebergScanPlan] = { - val tag = if (useRuntimeFilters) runtimeFilteredScanPlanTag else scanPlanTag + val tag = + if (useRuntimeFilters && exec.runtimeFilters.nonEmpty) { + runtimeFilteredScanPlanTag + } else { + scanPlanTag + } exec.getTagValue(tag) match { case Some(cached) => cached case None => @@ -124,7 +129,7 @@ object IcebergScanSupport extends Logging { val scanClassName = scan.getClass.getName // Only handle Iceberg scans; other sources must stay on Spark's path. if (scanClassName == SparkChangelogScanClassName) { - return planChangelogScan(exec, scan) + return planChangelogScan(exec, scan, useRuntimeFilters) } if (!AuronIcebergSourceUtil.getClassOfSparkBatchQueryScan.isInstance(scan)) { @@ -231,7 +236,10 @@ object IcebergScanSupport extends Logging { fieldIdsByName)) } - private def planChangelogScan(exec: BatchScanExec, scan: Scan): Option[IcebergScanPlan] = { + private def planChangelogScan( + exec: BatchScanExec, + scan: Scan, + useRuntimeFilters: Boolean): Option[IcebergScanPlan] = { val readSchema = scan.readSchema val schemas = supportedSchemas(readSchema, isChangelogScan = true) if (schemas.isEmpty) { @@ -239,7 +247,7 @@ object IcebergScanSupport extends Logging { } val (fileSchema, partitionSchema) = schemas.get - val partitions = inputPartitions(exec) + val partitions = inputPartitions(exec, useRuntimeFilters) if (partitions.isEmpty) { return Some( IcebergScanPlan( @@ -356,9 +364,12 @@ object IcebergScanSupport extends Logging { private def inputPartitions( exec: BatchScanExec, - useRuntimeFilters: Boolean = false): Seq[InputPartition] = { + useRuntimeFilters: Boolean): Seq[InputPartition] = { if (useRuntimeFilters) { - return runtimeFilteredPartitions(exec) + runtimeFilteredPartitions(exec) match { + case Some(partitions) => return partitions + case None => + } } // Prefer DataSource V2 batch API; if not available, fallback to exec methods via reflection. @@ -416,22 +427,26 @@ object IcebergScanSupport extends Logging { } } - private def runtimeFilteredPartitions(exec: BatchScanExec): Seq[InputPartition] = { + private def runtimeFilteredPartitions(exec: BatchScanExec): Option[Seq[InputPartition]] = { + if (exec.runtimeFilters.isEmpty) { + return None + } + try { MethodUtils.invokeMethod(exec, true, "prepare") MethodUtils.invokeMethod(exec, true, "waitForSubqueries") invokeDeclaredMethod(exec, "filteredPartitions") match { case Some(seq: scala.collection.Seq[_]) => - flattenPartitions(seq) + Some(flattenPartitions(seq)) case _ => - Seq.empty + None } } catch { case NonFatal(t) => logWarning( s"Failed to obtain runtime-filtered input partitions for ${exec.getClass.getName}.", t) - Seq.empty + None } } From 261fe454ccf4c1b37356e58e648697d4eb31ab63 Mon Sep 17 00:00:00 2001 From: linfeng <33561138+lyne7-sc@users.noreply.github.com> Date: Fri, 3 Jul 2026 11:11:12 +0800 Subject: [PATCH 03/11] display --- .../plan/NativeIcebergTableScanExec.scala | 25 +++++++++++++------ 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala index 76b13eab2..996cd667a 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala @@ -226,17 +226,28 @@ case class NativeIcebergTableScanExec( override val nodeName: String = "NativeIcebergTableScan" override def simpleString(maxFields: Int): String = { - val runtimeFilterString = - if (runtimeFilters.isEmpty) { - "" + val runtimeFiltersString = + if (runtimeFilters.nonEmpty) { + s", runtimeFilters=${runtimeFilters.mkString("[", ", ", "]")}" } else { - s", runtimeFilters=[${runtimeFilters.map(_.toString).mkString(", ")}]" + "" } - s"$nodeName$output$runtimeFilterString" + s"$nodeName (${basedScan.simpleString(maxFields)}$runtimeFiltersString)" + } + + override def verboseStringWithOperatorId(): String = { + s""" + |$formattedNodeName + |Output: ${output.mkString("[", ", ", "]")} + |${basedScan.scan.description()} + |RuntimeFilters: ${runtimeFilters.mkString("[", ", ", "]")} + |""".stripMargin } - // Delegate canonicalization to the original scan to keep plan equivalence checks consistent. - override protected def doCanonicalize(): SparkPlan = basedScan.canonicalized + // Keep canonicalization aligned with Spark's BatchScanExec, but first make sure it sees + // the top-level runtime filters carried by this native scan. + override protected def doCanonicalize(): SparkPlan = + IcebergScanSupport.withRuntimeFilters(basedScan, runtimeFilters).canonicalized private def buildFileSizes(): Map[String, Long] = { // Map file path to full file size; tasks may split a file into multiple ranges. From edf24ab64df1cfa2634352f82f28351a76c1d9b6 Mon Sep 17 00:00:00 2001 From: linfeng <33561138+lyne7-sc@users.noreply.github.com> Date: Fri, 3 Jul 2026 11:45:48 +0800 Subject: [PATCH 04/11] enhance dynamic partition pruning tests for Iceberg --- .../AuronIcebergIntegrationSuite.scala | 178 ++++++++++++++++-- 1 file changed, 165 insertions(+), 13 deletions(-) diff --git a/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala b/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala index c4a3e1bfd..a3ccd95a1 100644 --- a/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala +++ b/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala @@ -33,6 +33,8 @@ import org.apache.spark.sql.auron.iceberg.IcebergScanSupport import org.apache.spark.sql.catalyst.trees.TreeNodeTag import org.apache.spark.sql.execution.ExplainUtils.collectFirst import org.apache.spark.sql.execution.FormattedMode +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec} import org.apache.spark.sql.execution.auron.plan.NativeIcebergTableScanExec import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates @@ -184,26 +186,55 @@ class AuronIcebergIntegrationSuite |partitioned by (p) |""".stripMargin) sql("insert into local.db.t_dpp_fact values (1, 'a', 1), (2, 'b', 2), (3, 'c', 3)") - sql("create table local.db.t_dpp_dim using iceberg as select 2 as p") + sql("create table local.db.t_dpp_dim using iceberg as select 1 as id, 2 as p") withSQLConf( "spark.auron.enable" -> "true", "spark.auron.enable.iceberg.scan" -> "true", "spark.sql.optimizer.dynamicPartitionPruning.enabled" -> "true", - "spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly" -> "false", - "spark.sql.autoBroadcastJoinThreshold" -> "-1") { + "spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly" -> "true", + "spark.sql.autoBroadcastJoinThreshold" -> "1024") { val df = sql(""" - |select f.id, f.v, f.p + |select /*+ BROADCAST(d) */ f.id, f.v, f.p |from local.db.t_dpp_fact f |join local.db.t_dpp_dim d |on f.p = d.p + |where d.id = 1 |""".stripMargin) - checkAnswer(df, Seq(Row(2, "b", 2))) + checkNativeDppScan(df, Seq(Row(2, "b", 2)), "t_dpp_fact", 1L, 1L) + } + } + } - val explain = df.queryExecution.explainString(FormattedMode) - assert(explain.contains("NativeIcebergTableScan"), explain) - assert(explain.toLowerCase(Locale.ROOT).contains("dynamicpruning"), explain) + test("iceberg native scan handles dynamic pruning to empty partitions") { + withTable("local.db.t_dpp_empty_fact", "local.db.t_dpp_empty_dim") { + sql(""" + |create table local.db.t_dpp_empty_fact (id int, v string, p int) + |using iceberg + |partitioned by (p) + |""".stripMargin) + sql(""" + |insert into local.db.t_dpp_empty_fact + |values (1, 'a', 1), (2, 'b', 2), (3, 'c', 3) + |""".stripMargin) + sql("create table local.db.t_dpp_empty_dim using iceberg as select 1 as id, 9 as p") + + withSQLConf( + "spark.auron.enable" -> "true", + "spark.auron.enable.iceberg.scan" -> "true", + "spark.sql.optimizer.dynamicPartitionPruning.enabled" -> "true", + "spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly" -> "true", + "spark.sql.autoBroadcastJoinThreshold" -> "1024") { + val df = sql(""" + |select /*+ BROADCAST(d) */ f.id, f.v, f.p + |from local.db.t_dpp_empty_fact f + |join local.db.t_dpp_empty_dim d + |on f.p = d.p + |where d.id = 1 + |""".stripMargin) + + checkNativeDppScan(df, Seq.empty[Row], "t_dpp_empty_fact", 0L, 0L) } } } @@ -223,6 +254,39 @@ class AuronIcebergIntegrationSuite } } + test("iceberg native ORC scan applies dynamic pruning runtime filters") { + withTable("local.db.t_orc_dpp_fact", "local.db.t_orc_dpp_dim") { + sql(""" + |create table local.db.t_orc_dpp_fact (id int, v string, p int) + |using iceberg + |partitioned by (p) + |tblproperties ('write.format.default' = 'orc') + |""".stripMargin) + sql(""" + |insert into local.db.t_orc_dpp_fact + |values (1, 'a', 1), (2, 'b', 2), (3, 'c', 3) + |""".stripMargin) + sql("create table local.db.t_orc_dpp_dim using iceberg as select 1 as id, 2 as p") + + withSQLConf( + "spark.auron.enable" -> "true", + "spark.auron.enable.iceberg.scan" -> "true", + "spark.sql.optimizer.dynamicPartitionPruning.enabled" -> "true", + "spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly" -> "true", + "spark.sql.autoBroadcastJoinThreshold" -> "1024") { + val df = sql(""" + |select /*+ BROADCAST(d) */ f.id, f.v, f.p + |from local.db.t_orc_dpp_fact f + |join local.db.t_orc_dpp_dim d + |on f.p = d.p + |where d.id = 1 + |""".stripMargin) + + checkNativeDppScan(df, Seq(Row(2, "b", 2)), "t_orc_dpp_fact", 1L, 1L) + } + } + } + test("iceberg native parquet scan reads top-level renamed columns by field id") { withTable("local.db.t_rename") { sql("create table local.db.t_rename (id int, old_name string) using iceberg") @@ -508,6 +572,50 @@ class AuronIcebergIntegrationSuite } } + test("iceberg native changelog scan remains correct in dynamic pruning join") { + withTable("local.db.t_changelog_dpp", "local.db.t_changelog_dpp_dim") { + withTempView("t_changelog_dpp_changes") { + sql(""" + |create table local.db.t_changelog_dpp (id int, v string, p int) + |using iceberg + |partitioned by (p) + |tblproperties ('format-version' = '2') + |""".stripMargin) + sql("insert into local.db.t_changelog_dpp values (0, 'seed', 0)") + val startSnapshotId = currentSnapshotId("local.db.t_changelog_dpp") + sql(""" + |insert into local.db.t_changelog_dpp + |values (1, 'a', 1), (2, 'b', 2), (3, 'c', 3) + |""".stripMargin) + val endSnapshotId = currentSnapshotId("local.db.t_changelog_dpp") + createChangelogView( + "local.db.t_changelog_dpp", + "t_changelog_dpp_changes", + startSnapshotId, + endSnapshotId) + sql("create table local.db.t_changelog_dpp_dim using iceberg as select 1 as id, 2 as p") + + withSQLConf( + "spark.auron.enable" -> "true", + "spark.auron.enable.iceberg.scan" -> "true", + "spark.sql.optimizer.dynamicPartitionPruning.enabled" -> "true", + "spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly" -> "true", + "spark.sql.autoBroadcastJoinThreshold" -> "1024") { + val df = sql(""" + |select /*+ BROADCAST(d) */ c.id, c.v, c.p, c._change_type + |from t_changelog_dpp_changes c + |join local.db.t_changelog_dpp_dim d + |on c.p = d.p + |where d.id = 1 + |""".stripMargin) + + checkAnswer(df, Seq(Row(2, "b", 2, "INSERT"))) + executedNativeIcebergTableScanExec(df) + } + } + } + } + test("iceberg changelog scan falls back when delete changes exist") { withTable("local.db.t_changelog_delete") { withTempView("t_changelog_delete_changes") { @@ -680,19 +788,63 @@ class AuronIcebergIntegrationSuite } df } + + private def checkNativeDppScan( + df: DataFrame, + expected: Seq[Row], + tableName: String, + expectedFiles: Long, + expectedPartitions: Long): NativeIcebergTableScanExec = { + checkAnswer(df, expected) + + val nativeScan = executedNativeIcebergTableScanExec(df, tableName) + assert(nativeScan.runtimeFilters.nonEmpty, df.queryExecution.explainString(FormattedMode)) + assert(nativeScan.metrics("numFiles").value === expectedFiles) + assert(nativeScan.metrics("numPartitions").value === expectedPartitions) + + val explain = df.queryExecution.explainString(FormattedMode) + assert(explain.contains("NativeIcebergTableScan"), explain) + assert(explain.toLowerCase(Locale.ROOT).contains("dynamicpruning"), explain) + + nativeScan + } + private def icebergScanPlan(df: DataFrame) = df.queryExecution.sparkPlan.collectFirst { case scan: BatchScanExec => IcebergScanSupport.plan(scan) }.flatten - private def executedNativeIcebergTableScanExec(df: DataFrame): NativeIcebergTableScanExec = { - val nativeScan = df.queryExecution.executedPlan.collectFirst { - case scan: NativeIcebergTableScanExec => scan - } - assert(nativeScan.nonEmpty) + private def executedNativeIcebergTableScanExec( + df: DataFrame, + tableName: String = ""): NativeIcebergTableScanExec = { + val plan = df.queryExecution.executedPlan match { + case adaptive: AdaptiveSparkPlanExec => adaptive.executedPlan + case other => other + } + val nativeScan = collectMaterializedPlans(plan).collectFirst { + case scan: NativeIcebergTableScanExec + if tableName.isEmpty || scan.basedScan.scan.description().contains(tableName) => + scan + } + assert( + nativeScan.nonEmpty, + s""" + |No NativeIcebergTableScanExec found. + | + |Materialized plan: + |${plan.treeString} + |""".stripMargin) nativeScan.get } + private def collectMaterializedPlans(plan: SparkPlan): Seq[SparkPlan] = { + val actualPlan = plan match { + case stage: QueryStageExec => stage.plan + case other => other + } + actualPlan +: actualPlan.children.flatMap(collectMaterializedPlans) + } + test("native iceberg scan respects SinglePartition for global sort correctness") { withTable("local.db.t_global_sort") { sql("create table local.db.t_global_sort (id int, value string) using iceberg") From e0edb9ace02971dcc102a4829b7174fc77f477fa Mon Sep 17 00:00:00 2001 From: linfeng <33561138+lyne7-sc@users.noreply.github.com> Date: Fri, 3 Jul 2026 14:27:51 +0800 Subject: [PATCH 05/11] refactor --- .../src/main/scala/org/apache/spark/sql/auron/Shims.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala index b78d6846b..9dcdfcf1c 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala @@ -49,6 +49,7 @@ import org.apache.spark.sql.execution.auron.plan.NativeBroadcastJoinBase import org.apache.spark.sql.execution.auron.plan.NativeSortMergeJoinBase import org.apache.spark.sql.execution.auron.shuffle.RssPartitionWriterBase import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, ShuffledHashJoinExec} import org.apache.spark.sql.execution.metric.SQLMetric @@ -125,6 +126,10 @@ abstract class Shims { generatorOutput: Seq[Attribute], child: SparkPlan): NativeGenerateBase + def copyBatchScanExecWithRuntimeFilters( + exec: BatchScanExec, + runtimeFilters: Seq[Expression]): BatchScanExec + def getLimitAndOffset(plan: GlobalLimitExec): (Int, Int) = (plan.limit, 0) def createNativeGlobalLimitExec( From 8ed52f666fa0b698ba483c8f9e3612c9f84200ba Mon Sep 17 00:00:00 2001 From: linfeng <33561138+lyne7-sc@users.noreply.github.com> Date: Fri, 3 Jul 2026 14:29:43 +0800 Subject: [PATCH 06/11] refactor --- .../apache/spark/sql/auron/ShimsImpl.scala | 40 +++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala index 2ed0e349e..99a9e088b 100644 --- a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala @@ -97,6 +97,7 @@ import org.apache.spark.sql.execution.auron.plan.NativeWindowBase import org.apache.spark.sql.execution.auron.plan.NativeWindowExec import org.apache.spark.sql.execution.auron.shuffle.{AuronBlockStoreShuffleReaderBase, AuronRssShuffleManagerBase, RssPartitionWriterBase} import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, ReusedExchangeExec, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, ShuffledHashJoinExec} import org.apache.spark.sql.execution.joins.auron.plan.NativeBroadcastJoinExec @@ -301,6 +302,45 @@ class ShimsImpl extends Shims with Logging { child: SparkPlan): NativeGenerateBase = NativeGenerateExec(generator, requiredChildOutput, outer, generatorOutput, child) + @sparkver("3.0 / 3.1") + override def copyBatchScanExecWithRuntimeFilters( + exec: BatchScanExec, + runtimeFilters: Seq[Expression]): BatchScanExec = + exec.copy(exec.output, exec.scan) + + @sparkver("3.2") + override def copyBatchScanExecWithRuntimeFilters( + exec: BatchScanExec, + runtimeFilters: Seq[Expression]): BatchScanExec = + exec.copy(exec.output, exec.scan, runtimeFilters) + + @sparkver("3.3") + override def copyBatchScanExecWithRuntimeFilters( + exec: BatchScanExec, + runtimeFilters: Seq[Expression]): BatchScanExec = + exec.copy(exec.output, exec.scan, runtimeFilters, exec.keyGroupedPartitioning) + + @sparkver("3.4") + override def copyBatchScanExecWithRuntimeFilters( + exec: BatchScanExec, + runtimeFilters: Seq[Expression]): BatchScanExec = + exec.copy( + exec.output, + exec.scan, + runtimeFilters, + exec.keyGroupedPartitioning, + exec.ordering, + exec.table, + exec.commonPartitionValues, + exec.applyPartialClustering, + exec.replicatePartitions) + + @sparkver("3.5 / 4.0 / 4.1") + override def copyBatchScanExecWithRuntimeFilters( + exec: BatchScanExec, + runtimeFilters: Seq[Expression]): BatchScanExec = + exec.copy(exec.output, exec.scan, runtimeFilters, exec.ordering, exec.table, exec.spjParams) + @sparkver("3.4 / 3.5 / 4.0 / 4.1") private def effectiveLimit(rawLimit: Int): Int = if (rawLimit == -1) Int.MaxValue else rawLimit From e00951ce189d303f117f0edabedd18e76a951994 Mon Sep 17 00:00:00 2001 From: linfeng <33561138+lyne7-sc@users.noreply.github.com> Date: Fri, 3 Jul 2026 14:31:13 +0800 Subject: [PATCH 07/11] refactor withRuntimeFilters to use Shims --- .../auron/iceberg/IcebergScanSupport.scala | 21 +++++-------------- 1 file changed, 5 insertions(+), 16 deletions(-) diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala index ca64c96f2..77497a03e 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala @@ -24,7 +24,7 @@ import org.apache.iceberg.{AddedRowsScanTask, ChangelogOperation, ChangelogScanT import org.apache.iceberg.expressions.{And => IcebergAnd, BoundPredicate, Expression => IcebergExpression, Not => IcebergNot, Or => IcebergOr, UnboundPredicate} import org.apache.iceberg.spark.source.AuronIcebergSourceUtil import org.apache.spark.internal.Logging -import org.apache.spark.sql.auron.NativeConverters +import org.apache.spark.sql.auron.{NativeConverters, Shims} import org.apache.spark.sql.catalyst.expressions.{And => SparkAnd, AttributeReference, EqualTo, Expression => SparkExpression, GreaterThan, GreaterThanOrEqual, In, IsNaN, IsNotNull, IsNull, LessThan, LessThanOrEqual, Literal, Not => SparkNot, Or => SparkOr} import org.apache.spark.sql.catalyst.trees.TreeNodeTag import org.apache.spark.sql.connector.read.{InputPartition, Scan} @@ -104,21 +104,10 @@ object IcebergScanSupport extends Logging { def withRuntimeFilters( exec: BatchScanExec, runtimeFilters: Seq[SparkExpression]): BatchScanExec = { - val params = exec.productIterator.toArray - assert(params.length >= 3, s"Unexpected BatchScanExec shape: ${exec.getClass.getName}") - params(2) = runtimeFilters - try { - exec.getClass.getMethods - .find(method => method.getName == "copy" && method.getParameterCount == params.length) - .getOrElse { - throw new NoSuchMethodException( - s"Cannot find compatible BatchScanExec.copy with ${params.length} parameters.") - } - .invoke(exec, params.map(_.asInstanceOf[AnyRef]): _*) - .asInstanceOf[BatchScanExec] - } catch { - case NonFatal(t) => - throw new IllegalStateException("Failed to copy BatchScanExec with runtime filters.", t) + if (exec.runtimeFilters == runtimeFilters) { + exec + } else { + Shims.get.copyBatchScanExecWithRuntimeFilters(exec, runtimeFilters) } } From 74c889b2567caf510af5ea95042e0c23b1281934 Mon Sep 17 00:00:00 2001 From: linfeng <33561138+lyne7-sc@users.noreply.github.com> Date: Sun, 5 Jul 2026 22:19:25 +0800 Subject: [PATCH 08/11] Refine Iceberg runtime filter partition planning --- .../auron/iceberg/IcebergScanSupport.scala | 20 ++++++------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala index 77497a03e..c444af41c 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala @@ -421,20 +421,12 @@ object IcebergScanSupport extends Logging { return None } - try { - MethodUtils.invokeMethod(exec, true, "prepare") - MethodUtils.invokeMethod(exec, true, "waitForSubqueries") - invokeDeclaredMethod(exec, "filteredPartitions") match { - case Some(seq: scala.collection.Seq[_]) => - Some(flattenPartitions(seq)) - case _ => - None - } - } catch { - case NonFatal(t) => - logWarning( - s"Failed to obtain runtime-filtered input partitions for ${exec.getClass.getName}.", - t) + exec.prepare() + MethodUtils.invokeMethod(exec, true, "waitForSubqueries") + invokeDeclaredMethod(exec, "filteredPartitions") match { + case Some(seq: scala.collection.Seq[_]) => + Some(flattenPartitions(seq)) + case _ => None } } From a7211086c50454394cb60de32b23bcf31d5328ac Mon Sep 17 00:00:00 2001 From: linfeng <33561138+lyne7-sc@users.noreply.github.com> Date: Sun, 5 Jul 2026 22:21:11 +0800 Subject: [PATCH 09/11] logWarning Iceberg runtime filter planning fallback --- .../auron/plan/NativeIcebergTableScanExec.scala | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala index 996cd667a..72ae4906f 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala @@ -65,8 +65,16 @@ case class NativeIcebergTableScanExec( private lazy val plan: IcebergScanPlan = { if (runtimeFilters.nonEmpty) { - val filteredScan = IcebergScanSupport.withRuntimeFilters(basedScan, runtimeFilters) - IcebergScanSupport.plan(filteredScan, useRuntimeFilters = true).getOrElse(staticPlan) + val runtimeFilteredScan = IcebergScanSupport.withRuntimeFilters(basedScan, runtimeFilters) + IcebergScanSupport.plan(runtimeFilteredScan, useRuntimeFilters = true) match { + case Some(runtimeFilteredPlan) => + runtimeFilteredPlan + case None => + logWarning( + "Runtime-filtered Iceberg scan planning was unavailable; " + + "falling back to the unfiltered Iceberg scan plan.") + staticPlan + } } else { staticPlan } From 92dcfaa3aab13837f732ff392ea4c724174c5689 Mon Sep 17 00:00:00 2001 From: linfeng <33561138+lyne7-sc@users.noreply.github.com> Date: Sun, 5 Jul 2026 23:06:53 +0800 Subject: [PATCH 10/11] comment --- .../org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala index c444af41c..e87118ea9 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala @@ -101,6 +101,8 @@ object IcebergScanSupport extends Logging { } } + // Native scans carry runtime filters explicitly, independent from the underlying BatchScanExec. + // If they differ, rebuild the BatchScanExec before asking Spark for filtered partitions. def withRuntimeFilters( exec: BatchScanExec, runtimeFilters: Seq[SparkExpression]): BatchScanExec = { From f51366cb978a672cf45d00ac2e907c6cbb3777c8 Mon Sep 17 00:00:00 2001 From: linfeng <33561138+lyne7-sc@users.noreply.github.com> Date: Sun, 5 Jul 2026 23:08:39 +0800 Subject: [PATCH 11/11] update comment --- .../sql/execution/auron/plan/NativeIcebergTableScanExec.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala index 72ae4906f..ae608630c 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala @@ -252,8 +252,7 @@ case class NativeIcebergTableScanExec( |""".stripMargin } - // Keep canonicalization aligned with Spark's BatchScanExec, but first make sure it sees - // the top-level runtime filters carried by this native scan. + // Canonicalize with the native scan's runtime filters. override protected def doCanonicalize(): SparkPlan = IcebergScanSupport.withRuntimeFilters(basedScan, runtimeFilters).canonicalized