diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala index 2ed0e349e..99a9e088b 100644 --- a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala @@ -97,6 +97,7 @@ import org.apache.spark.sql.execution.auron.plan.NativeWindowBase import org.apache.spark.sql.execution.auron.plan.NativeWindowExec import org.apache.spark.sql.execution.auron.shuffle.{AuronBlockStoreShuffleReaderBase, AuronRssShuffleManagerBase, RssPartitionWriterBase} import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, ReusedExchangeExec, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, ShuffledHashJoinExec} import org.apache.spark.sql.execution.joins.auron.plan.NativeBroadcastJoinExec @@ -301,6 +302,45 @@ class ShimsImpl extends Shims with Logging { child: SparkPlan): NativeGenerateBase = NativeGenerateExec(generator, requiredChildOutput, outer, generatorOutput, child) + @sparkver("3.0 / 3.1") + override def copyBatchScanExecWithRuntimeFilters( + exec: BatchScanExec, + runtimeFilters: Seq[Expression]): BatchScanExec = + exec.copy(exec.output, exec.scan) + + @sparkver("3.2") + override def copyBatchScanExecWithRuntimeFilters( + exec: BatchScanExec, + runtimeFilters: Seq[Expression]): BatchScanExec = + exec.copy(exec.output, exec.scan, runtimeFilters) + + @sparkver("3.3") + override def copyBatchScanExecWithRuntimeFilters( + exec: BatchScanExec, + runtimeFilters: Seq[Expression]): BatchScanExec = + exec.copy(exec.output, exec.scan, runtimeFilters, exec.keyGroupedPartitioning) + + @sparkver("3.4") + override def copyBatchScanExecWithRuntimeFilters( + exec: BatchScanExec, + runtimeFilters: Seq[Expression]): BatchScanExec = + exec.copy( + exec.output, + exec.scan, + runtimeFilters, + exec.keyGroupedPartitioning, + exec.ordering, + exec.table, + exec.commonPartitionValues, + exec.applyPartialClustering, + exec.replicatePartitions) + + @sparkver("3.5 / 4.0 / 4.1") + override def copyBatchScanExecWithRuntimeFilters( + exec: BatchScanExec, + runtimeFilters: Seq[Expression]): BatchScanExec = + exec.copy(exec.output, exec.scan, runtimeFilters, exec.ordering, exec.table, exec.spjParams) + @sparkver("3.4 / 3.5 / 4.0 / 4.1") private def effectiveLimit(rawLimit: Int): Int = if (rawLimit == -1) Int.MaxValue else rawLimit diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala index b78d6846b..9dcdfcf1c 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala @@ -49,6 +49,7 @@ import org.apache.spark.sql.execution.auron.plan.NativeBroadcastJoinBase import org.apache.spark.sql.execution.auron.plan.NativeSortMergeJoinBase import org.apache.spark.sql.execution.auron.shuffle.RssPartitionWriterBase import org.apache.spark.sql.execution.datasources.PartitionedFile +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, ShuffledHashJoinExec} import org.apache.spark.sql.execution.metric.SQLMetric @@ -125,6 +126,10 @@ abstract class Shims { generatorOutput: Seq[Attribute], child: SparkPlan): NativeGenerateBase + def copyBatchScanExecWithRuntimeFilters( + exec: BatchScanExec, + runtimeFilters: Seq[Expression]): BatchScanExec + def getLimitAndOffset(plan: GlobalLimitExec): (Int, Int) = (plan.limit, 0) def createNativeGlobalLimitExec( diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala index 6a9c3daa8..3da5b3a3c 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala @@ -55,7 +55,8 @@ class IcebergConvertProvider extends AuronConvertProvider with Logging { case e: BatchScanExec => IcebergScanSupport.plan(e) match { case Some(plan) => - AuronConverters.addRenameColumnsExec(NativeIcebergTableScanExec(e, plan)) + AuronConverters.addRenameColumnsExec( + NativeIcebergTableScanExec(e, plan, e.runtimeFilters)) case None => IcebergScanSupport.fallbackReason(e) match { case Some(reason) => throw new AssertionError(reason) diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala index 3aa85b2de..e87118ea9 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala @@ -19,11 +19,12 @@ package org.apache.spark.sql.auron.iceberg import scala.collection.JavaConverters._ import scala.util.control.NonFatal +import org.apache.commons.lang3.reflect.MethodUtils import org.apache.iceberg.{AddedRowsScanTask, ChangelogOperation, ChangelogScanTask, FileFormat, FileScanTask, MetadataColumns, ScanTask} import org.apache.iceberg.expressions.{And => IcebergAnd, BoundPredicate, Expression => IcebergExpression, Not => IcebergNot, Or => IcebergOr, UnboundPredicate} import org.apache.iceberg.spark.source.AuronIcebergSourceUtil import org.apache.spark.internal.Logging -import org.apache.spark.sql.auron.NativeConverters +import org.apache.spark.sql.auron.{NativeConverters, Shims} import org.apache.spark.sql.catalyst.expressions.{And => SparkAnd, AttributeReference, EqualTo, Expression => SparkExpression, GreaterThan, GreaterThanOrEqual, In, IsNaN, IsNotNull, IsNull, LessThan, LessThanOrEqual, Literal, Not => SparkNot, Or => SparkOr} import org.apache.spark.sql.catalyst.trees.TreeNodeTag import org.apache.spark.sql.connector.read.{InputPartition, Scan} @@ -55,6 +56,8 @@ final case class IcebergScanPlan( object IcebergScanSupport extends Logging { private val scanPlanTag: TreeNodeTag[Option[IcebergScanPlan]] = TreeNodeTag( "auron.iceberg.scan.plan") + private val runtimeFilteredScanPlanTag: TreeNodeTag[Option[IcebergScanPlan]] = TreeNodeTag( + "auron.iceberg.runtime.filtered.scan.plan") private val SparkChangelogScanClassName = "org.apache.iceberg.spark.source.SparkChangelogScan" @@ -82,35 +85,56 @@ object IcebergScanSupport extends Logging { } } - def plan(exec: BatchScanExec): Option[IcebergScanPlan] = { - exec.getTagValue(scanPlanTag) match { + def plan(exec: BatchScanExec, useRuntimeFilters: Boolean = false): Option[IcebergScanPlan] = { + val tag = + if (useRuntimeFilters && exec.runtimeFilters.nonEmpty) { + runtimeFilteredScanPlanTag + } else { + scanPlanTag + } + exec.getTagValue(tag) match { case Some(cached) => cached case None => - val planned = planUncached(exec) - exec.setTagValue(scanPlanTag, planned) + val planned = planUncached(exec, useRuntimeFilters) + exec.setTagValue(tag, planned) planned } } - private def planUncached(exec: BatchScanExec): Option[IcebergScanPlan] = { + // Native scans carry runtime filters explicitly, independent from the underlying BatchScanExec. + // If they differ, rebuild the BatchScanExec before asking Spark for filtered partitions. + def withRuntimeFilters( + exec: BatchScanExec, + runtimeFilters: Seq[SparkExpression]): BatchScanExec = { + if (exec.runtimeFilters == runtimeFilters) { + exec + } else { + Shims.get.copyBatchScanExecWithRuntimeFilters(exec, runtimeFilters) + } + } + + private def planUncached( + exec: BatchScanExec, + useRuntimeFilters: Boolean): Option[IcebergScanPlan] = { val scan = exec.scan val scanClassName = scan.getClass.getName // Only handle Iceberg scans; other sources must stay on Spark's path. if (scanClassName == SparkChangelogScanClassName) { - return planChangelogScan(exec, scan) + return planChangelogScan(exec, scan, useRuntimeFilters) } if (!AuronIcebergSourceUtil.getClassOfSparkBatchQueryScan.isInstance(scan)) { return None } - planFileScan(exec, scan, scanClassName) + planFileScan(exec, scan, scanClassName, useRuntimeFilters) } private def planFileScan( exec: BatchScanExec, scan: Scan, - scanClassName: String): Option[IcebergScanPlan] = { + scanClassName: String, + useRuntimeFilters: Boolean): Option[IcebergScanPlan] = { val readSchema = scan.readSchema val schemas = supportedSchemas(readSchema, isChangelogScan = false) if (schemas.isEmpty) { @@ -143,7 +167,7 @@ object IcebergScanSupport extends Logging { missingFieldIds.isEmpty, s"Missing Iceberg field ids for columns: ${missingFieldIds.mkString(", ")}") - val partitions = inputPartitions(exec) + val partitions = inputPartitions(exec, useRuntimeFilters) // Empty scan (e.g. empty table) should still build a plan to return no rows. if (partitions.isEmpty) { logWarning(s"Native Iceberg scan planned with empty partitions for $scanClassName.") @@ -203,7 +227,10 @@ object IcebergScanSupport extends Logging { fieldIdsByName)) } - private def planChangelogScan(exec: BatchScanExec, scan: Scan): Option[IcebergScanPlan] = { + private def planChangelogScan( + exec: BatchScanExec, + scan: Scan, + useRuntimeFilters: Boolean): Option[IcebergScanPlan] = { val readSchema = scan.readSchema val schemas = supportedSchemas(readSchema, isChangelogScan = true) if (schemas.isEmpty) { @@ -211,7 +238,7 @@ object IcebergScanSupport extends Logging { } val (fileSchema, partitionSchema) = schemas.get - val partitions = inputPartitions(exec) + val partitions = inputPartitions(exec, useRuntimeFilters) if (partitions.isEmpty) { return Some( IcebergScanPlan( @@ -326,7 +353,16 @@ object IcebergScanSupport extends Logging { private def deletesEmpty(deletes: java.util.List[_]): Boolean = deletes == null || deletes.isEmpty - private def inputPartitions(exec: BatchScanExec): Seq[InputPartition] = { + private def inputPartitions( + exec: BatchScanExec, + useRuntimeFilters: Boolean): Seq[InputPartition] = { + if (useRuntimeFilters) { + runtimeFilteredPartitions(exec) match { + case Some(partitions) => return partitions + case None => + } + } + // Prefer DataSource V2 batch API; if not available, fallback to exec methods via reflection. val fromBatch = try { @@ -382,6 +418,32 @@ object IcebergScanSupport extends Logging { } } + private def runtimeFilteredPartitions(exec: BatchScanExec): Option[Seq[InputPartition]] = { + if (exec.runtimeFilters.isEmpty) { + return None + } + + exec.prepare() + MethodUtils.invokeMethod(exec, true, "waitForSubqueries") + invokeDeclaredMethod(exec, "filteredPartitions") match { + case Some(seq: scala.collection.Seq[_]) => + Some(flattenPartitions(seq)) + case _ => + None + } + } + + private def flattenPartitions(seq: scala.collection.Seq[_]): Seq[InputPartition] = { + seq.flatMap { + case partition: InputPartition => + Seq(partition) + case nested: scala.collection.Seq[_] => + flattenPartitions(nested) + case _ => + Seq.empty + }.toSeq + } + private case class IcebergPartitionView(tasks: Seq[ScanTask]) private def icebergPartition(partition: InputPartition): Option[IcebergPartitionView] = { diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala index 3dfa08b65..ae608630c 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala @@ -31,9 +31,9 @@ import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.auron.{EmptyNativeRDD, NativeConverters, NativeHelper, NativeRDD, NativeSupports, Shims} -import org.apache.spark.sql.auron.iceberg.{IcebergNativeScanTask, IcebergScanPlan} +import org.apache.spark.sql.auron.iceberg.{IcebergNativeScanTask, IcebergScanPlan, IcebergScanSupport} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, Literal} +import org.apache.spark.sql.catalyst.expressions.{Expression, GenericInternalRow, Literal} import org.apache.spark.sql.catalyst.plans.physical.SinglePartition import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan, SQLExecution} import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile} @@ -47,7 +47,10 @@ import org.apache.auron.{protobuf => pb} import org.apache.auron.jni.JniBridge import org.apache.auron.metric.SparkMetricNode -case class NativeIcebergTableScanExec(basedScan: BatchScanExec, plan: IcebergScanPlan) +case class NativeIcebergTableScanExec( + basedScan: BatchScanExec, + staticPlan: IcebergScanPlan, + runtimeFilters: Seq[Expression]) extends LeafExecNode with NativeSupports with Logging { @@ -60,6 +63,23 @@ case class NativeIcebergTableScanExec(basedScan: BatchScanExec, plan: IcebergSca override val output = basedScan.output override val outputPartitioning = basedScan.outputPartitioning + private lazy val plan: IcebergScanPlan = { + if (runtimeFilters.nonEmpty) { + val runtimeFilteredScan = IcebergScanSupport.withRuntimeFilters(basedScan, runtimeFilters) + IcebergScanSupport.plan(runtimeFilteredScan, useRuntimeFilters = true) match { + case Some(runtimeFilteredPlan) => + runtimeFilteredPlan + case None => + logWarning( + "Runtime-filtered Iceberg scan planning was unavailable; " + + "falling back to the unfiltered Iceberg scan plan.") + staticPlan + } + } else { + staticPlan + } + } + private lazy val fileSchema: StructType = plan.fileSchema private lazy val partitionSchema: StructType = plan.partitionSchema private lazy val projectableSchema: StructType = @@ -213,8 +233,28 @@ case class NativeIcebergTableScanExec(basedScan: BatchScanExec, plan: IcebergSca override val nodeName: String = "NativeIcebergTableScan" - // Delegate canonicalization to the original scan to keep plan equivalence checks consistent. - override protected def doCanonicalize(): SparkPlan = basedScan.canonicalized + override def simpleString(maxFields: Int): String = { + val runtimeFiltersString = + if (runtimeFilters.nonEmpty) { + s", runtimeFilters=${runtimeFilters.mkString("[", ", ", "]")}" + } else { + "" + } + s"$nodeName (${basedScan.simpleString(maxFields)}$runtimeFiltersString)" + } + + override def verboseStringWithOperatorId(): String = { + s""" + |$formattedNodeName + |Output: ${output.mkString("[", ", ", "]")} + |${basedScan.scan.description()} + |RuntimeFilters: ${runtimeFilters.mkString("[", ", ", "]")} + |""".stripMargin + } + + // Canonicalize with the native scan's runtime filters. + override protected def doCanonicalize(): SparkPlan = + IcebergScanSupport.withRuntimeFilters(basedScan, runtimeFilters).canonicalized private def buildFileSizes(): Map[String, Long] = { // Map file path to full file size; tasks may split a file into multiple ranges. diff --git a/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala b/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala index 1142b6e03..a3ccd95a1 100644 --- a/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala +++ b/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala @@ -16,7 +16,7 @@ */ package org.apache.auron.iceberg -import java.util.UUID +import java.util.{Locale, UUID} import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit @@ -32,6 +32,9 @@ import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.auron.iceberg.IcebergScanSupport import org.apache.spark.sql.catalyst.trees.TreeNodeTag import org.apache.spark.sql.execution.ExplainUtils.collectFirst +import org.apache.spark.sql.execution.FormattedMode +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec} import org.apache.spark.sql.execution.auron.plan.NativeIcebergTableScanExec import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates @@ -175,6 +178,67 @@ class AuronIcebergIntegrationSuite } } + test("iceberg native scan preserves dynamic pruning runtime filters") { + withTable("local.db.t_dpp_fact", "local.db.t_dpp_dim") { + sql(""" + |create table local.db.t_dpp_fact (id int, v string, p int) + |using iceberg + |partitioned by (p) + |""".stripMargin) + sql("insert into local.db.t_dpp_fact values (1, 'a', 1), (2, 'b', 2), (3, 'c', 3)") + sql("create table local.db.t_dpp_dim using iceberg as select 1 as id, 2 as p") + + withSQLConf( + "spark.auron.enable" -> "true", + "spark.auron.enable.iceberg.scan" -> "true", + "spark.sql.optimizer.dynamicPartitionPruning.enabled" -> "true", + "spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly" -> "true", + "spark.sql.autoBroadcastJoinThreshold" -> "1024") { + val df = sql(""" + |select /*+ BROADCAST(d) */ f.id, f.v, f.p + |from local.db.t_dpp_fact f + |join local.db.t_dpp_dim d + |on f.p = d.p + |where d.id = 1 + |""".stripMargin) + + checkNativeDppScan(df, Seq(Row(2, "b", 2)), "t_dpp_fact", 1L, 1L) + } + } + } + + test("iceberg native scan handles dynamic pruning to empty partitions") { + withTable("local.db.t_dpp_empty_fact", "local.db.t_dpp_empty_dim") { + sql(""" + |create table local.db.t_dpp_empty_fact (id int, v string, p int) + |using iceberg + |partitioned by (p) + |""".stripMargin) + sql(""" + |insert into local.db.t_dpp_empty_fact + |values (1, 'a', 1), (2, 'b', 2), (3, 'c', 3) + |""".stripMargin) + sql("create table local.db.t_dpp_empty_dim using iceberg as select 1 as id, 9 as p") + + withSQLConf( + "spark.auron.enable" -> "true", + "spark.auron.enable.iceberg.scan" -> "true", + "spark.sql.optimizer.dynamicPartitionPruning.enabled" -> "true", + "spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly" -> "true", + "spark.sql.autoBroadcastJoinThreshold" -> "1024") { + val df = sql(""" + |select /*+ BROADCAST(d) */ f.id, f.v, f.p + |from local.db.t_dpp_empty_fact f + |join local.db.t_dpp_empty_dim d + |on f.p = d.p + |where d.id = 1 + |""".stripMargin) + + checkNativeDppScan(df, Seq.empty[Row], "t_dpp_empty_fact", 0L, 0L) + } + } + } + test("iceberg native scan is applied for ORC COW table") { withTable("local.db.t_orc") { sql(""" @@ -190,6 +254,39 @@ class AuronIcebergIntegrationSuite } } + test("iceberg native ORC scan applies dynamic pruning runtime filters") { + withTable("local.db.t_orc_dpp_fact", "local.db.t_orc_dpp_dim") { + sql(""" + |create table local.db.t_orc_dpp_fact (id int, v string, p int) + |using iceberg + |partitioned by (p) + |tblproperties ('write.format.default' = 'orc') + |""".stripMargin) + sql(""" + |insert into local.db.t_orc_dpp_fact + |values (1, 'a', 1), (2, 'b', 2), (3, 'c', 3) + |""".stripMargin) + sql("create table local.db.t_orc_dpp_dim using iceberg as select 1 as id, 2 as p") + + withSQLConf( + "spark.auron.enable" -> "true", + "spark.auron.enable.iceberg.scan" -> "true", + "spark.sql.optimizer.dynamicPartitionPruning.enabled" -> "true", + "spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly" -> "true", + "spark.sql.autoBroadcastJoinThreshold" -> "1024") { + val df = sql(""" + |select /*+ BROADCAST(d) */ f.id, f.v, f.p + |from local.db.t_orc_dpp_fact f + |join local.db.t_orc_dpp_dim d + |on f.p = d.p + |where d.id = 1 + |""".stripMargin) + + checkNativeDppScan(df, Seq(Row(2, "b", 2)), "t_orc_dpp_fact", 1L, 1L) + } + } + } + test("iceberg native parquet scan reads top-level renamed columns by field id") { withTable("local.db.t_rename") { sql("create table local.db.t_rename (id int, old_name string) using iceberg") @@ -475,6 +572,50 @@ class AuronIcebergIntegrationSuite } } + test("iceberg native changelog scan remains correct in dynamic pruning join") { + withTable("local.db.t_changelog_dpp", "local.db.t_changelog_dpp_dim") { + withTempView("t_changelog_dpp_changes") { + sql(""" + |create table local.db.t_changelog_dpp (id int, v string, p int) + |using iceberg + |partitioned by (p) + |tblproperties ('format-version' = '2') + |""".stripMargin) + sql("insert into local.db.t_changelog_dpp values (0, 'seed', 0)") + val startSnapshotId = currentSnapshotId("local.db.t_changelog_dpp") + sql(""" + |insert into local.db.t_changelog_dpp + |values (1, 'a', 1), (2, 'b', 2), (3, 'c', 3) + |""".stripMargin) + val endSnapshotId = currentSnapshotId("local.db.t_changelog_dpp") + createChangelogView( + "local.db.t_changelog_dpp", + "t_changelog_dpp_changes", + startSnapshotId, + endSnapshotId) + sql("create table local.db.t_changelog_dpp_dim using iceberg as select 1 as id, 2 as p") + + withSQLConf( + "spark.auron.enable" -> "true", + "spark.auron.enable.iceberg.scan" -> "true", + "spark.sql.optimizer.dynamicPartitionPruning.enabled" -> "true", + "spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly" -> "true", + "spark.sql.autoBroadcastJoinThreshold" -> "1024") { + val df = sql(""" + |select /*+ BROADCAST(d) */ c.id, c.v, c.p, c._change_type + |from t_changelog_dpp_changes c + |join local.db.t_changelog_dpp_dim d + |on c.p = d.p + |where d.id = 1 + |""".stripMargin) + + checkAnswer(df, Seq(Row(2, "b", 2, "INSERT"))) + executedNativeIcebergTableScanExec(df) + } + } + } + } + test("iceberg changelog scan falls back when delete changes exist") { withTable("local.db.t_changelog_delete") { withTempView("t_changelog_delete_changes") { @@ -647,19 +788,63 @@ class AuronIcebergIntegrationSuite } df } + + private def checkNativeDppScan( + df: DataFrame, + expected: Seq[Row], + tableName: String, + expectedFiles: Long, + expectedPartitions: Long): NativeIcebergTableScanExec = { + checkAnswer(df, expected) + + val nativeScan = executedNativeIcebergTableScanExec(df, tableName) + assert(nativeScan.runtimeFilters.nonEmpty, df.queryExecution.explainString(FormattedMode)) + assert(nativeScan.metrics("numFiles").value === expectedFiles) + assert(nativeScan.metrics("numPartitions").value === expectedPartitions) + + val explain = df.queryExecution.explainString(FormattedMode) + assert(explain.contains("NativeIcebergTableScan"), explain) + assert(explain.toLowerCase(Locale.ROOT).contains("dynamicpruning"), explain) + + nativeScan + } + private def icebergScanPlan(df: DataFrame) = df.queryExecution.sparkPlan.collectFirst { case scan: BatchScanExec => IcebergScanSupport.plan(scan) }.flatten - private def executedNativeIcebergTableScanExec(df: DataFrame): NativeIcebergTableScanExec = { - val nativeScan = df.queryExecution.executedPlan.collectFirst { - case scan: NativeIcebergTableScanExec => scan - } - assert(nativeScan.nonEmpty) + private def executedNativeIcebergTableScanExec( + df: DataFrame, + tableName: String = ""): NativeIcebergTableScanExec = { + val plan = df.queryExecution.executedPlan match { + case adaptive: AdaptiveSparkPlanExec => adaptive.executedPlan + case other => other + } + val nativeScan = collectMaterializedPlans(plan).collectFirst { + case scan: NativeIcebergTableScanExec + if tableName.isEmpty || scan.basedScan.scan.description().contains(tableName) => + scan + } + assert( + nativeScan.nonEmpty, + s""" + |No NativeIcebergTableScanExec found. + | + |Materialized plan: + |${plan.treeString} + |""".stripMargin) nativeScan.get } + private def collectMaterializedPlans(plan: SparkPlan): Seq[SparkPlan] = { + val actualPlan = plan match { + case stage: QueryStageExec => stage.plan + case other => other + } + actualPlan +: actualPlan.children.flatMap(collectMaterializedPlans) + } + test("native iceberg scan respects SinglePartition for global sort correctness") { withTable("local.db.t_global_sort") { sql("create table local.db.t_global_sort (id int, value string) using iceberg")