Skip to content

Commit 1354593

Browse files
committed
test: add CometNativeWriteExec plan assertion to SPARK-38811 tests
- Add assertCometNativeWrite() helper to capture and validate execution plan - Add captureSqlWritePlan() using QueryExecutionListener to intercept plans - Update all 8 SPARK-38811 tests to verify CometNativeWriteExec is used - Update ignored test comment with detailed catalog cache issue explanation
1 parent 69083c3 commit 1354593

File tree

1 file changed

+54
-8
lines changed

1 file changed

+54
-8
lines changed

spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala

Lines changed: 54 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,51 @@ class CometParquetWriterSuite extends CometTestBase {
386386
}
387387
}
388388

389+
private def assertCometNativeWrite(insertSql: String): Unit = {
390+
val plan = captureSqlWritePlan(insertSql)
391+
val hasNativeWrite = plan.exists {
392+
case _: CometNativeWriteExec => true
393+
case d: DataWritingCommandExec =>
394+
d.child.exists(_.isInstanceOf[CometNativeWriteExec])
395+
case _ => false
396+
}
397+
assert(
398+
hasNativeWrite,
399+
s"Expected CometNativeWriteExec in plan, but not found:\n${plan.treeString}")
400+
}
401+
402+
private def captureSqlWritePlan(sqlText: String): SparkPlan = {
403+
var capturedPlan: Option[QueryExecution] = None
404+
405+
val listener = new org.apache.spark.sql.util.QueryExecutionListener {
406+
override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = {
407+
if (funcName == "command") {
408+
capturedPlan = Some(qe)
409+
}
410+
}
411+
override def onFailure(
412+
funcName: String,
413+
qe: QueryExecution,
414+
exception: Exception): Unit = {}
415+
}
416+
417+
spark.listenerManager.register(listener)
418+
try {
419+
sql(sqlText)
420+
val maxWaitTimeMs = 5000
421+
val checkIntervalMs = 50
422+
var iterations = 0
423+
while (capturedPlan.isEmpty && iterations < maxWaitTimeMs / checkIntervalMs) {
424+
Thread.sleep(checkIntervalMs)
425+
iterations += 1
426+
}
427+
assert(capturedPlan.isDefined, s"Failed to capture plan for: $sqlText")
428+
stripAQEPlan(capturedPlan.get.executedPlan)
429+
} finally {
430+
spark.listenerManager.unregister(listener)
431+
}
432+
}
433+
389434
// SPARK-38811 INSERT INTO on columns added with ALTER TABLE ADD COLUMNS: Positive tests
390435
// Mirrors the Spark InsertSuite test to validate Comet native writer compatibility.
391436

@@ -394,7 +439,7 @@ class CometParquetWriterSuite extends CometTestBase {
394439
withTable("t") {
395440
sql("create table t(i boolean) using parquet")
396441
sql("alter table t add column s string default concat('abc', 'def')")
397-
sql("insert into t values(true, default)")
442+
assertCometNativeWrite("insert into t values(true, default)")
398443
checkAnswer(spark.table("t"), Row(true, "abcdef"))
399444
}
400445
}
@@ -406,7 +451,7 @@ class CometParquetWriterSuite extends CometTestBase {
406451
sql("create table t(i int) using parquet")
407452
sql("alter table t add column s bigint default 42")
408453
sql("alter table t add column x bigint default 43")
409-
sql("insert into t(i) values(1)")
454+
assertCometNativeWrite("insert into t(i) values(1)")
410455
checkAnswer(spark.table("t"), Row(1, 42, 43))
411456
}
412457
}
@@ -417,7 +462,7 @@ class CometParquetWriterSuite extends CometTestBase {
417462
withTable("t") {
418463
sql("create table t(i int) using parquet")
419464
sql("alter table t add columns s bigint default 42, x bigint default 43")
420-
sql("insert into t(i) values(1)")
465+
assertCometNativeWrite("insert into t(i) values(1)")
421466
checkAnswer(spark.table("t"), Row(1, 42, 43))
422467
}
423468
}
@@ -429,7 +474,7 @@ class CometParquetWriterSuite extends CometTestBase {
429474
sql("create table t(i int) using parquet")
430475
sql("alter table t add column s bigint default 42")
431476
sql("alter table t add column x bigint")
432-
sql("insert into t(i) values(1)")
477+
assertCometNativeWrite("insert into t(i) values(1)")
433478
checkAnswer(spark.table("t"), Row(1, 42, null))
434479
}
435480
}
@@ -440,7 +485,7 @@ class CometParquetWriterSuite extends CometTestBase {
440485
withTable("t") {
441486
sql("create table t(i boolean) using parquet")
442487
sql("alter table t add column s bigint default 41 + 1")
443-
sql("insert into t(i) values(default)")
488+
assertCometNativeWrite("insert into t(i) values(default)")
444489
checkAnswer(spark.table("t"), Row(null, 42))
445490
}
446491
}
@@ -451,7 +496,7 @@ class CometParquetWriterSuite extends CometTestBase {
451496
withTable("t") {
452497
sql("create table t(i boolean default false) using parquet")
453498
sql("alter table t add column s bigint default 42")
454-
sql("insert into t values(false, default), (default, 42)")
499+
assertCometNativeWrite("insert into t values(false, default), (default, 42)")
455500
checkAnswer(spark.table("t"), Seq(Row(false, 42), Row(false, 42)))
456501
}
457502
}
@@ -462,7 +507,8 @@ class CometParquetWriterSuite extends CometTestBase {
462507
withTable("t") {
463508
sql("create table t(i boolean) using parquet")
464509
sql("alter table t add column s bigint default 42")
465-
sql("insert into t select * from values (false, default) as tab(col, other)")
510+
assertCometNativeWrite(
511+
"insert into t select * from values (false, default) as tab(col, other)")
466512
checkAnswer(spark.table("t"), Row(false, 42))
467513
}
468514
}
@@ -473,7 +519,7 @@ class CometParquetWriterSuite extends CometTestBase {
473519
withTable("t") {
474520
sql("create table t(i boolean) using parquet")
475521
sql("alter table t add column s bigint default 42")
476-
sql("insert into t values (default, 43)")
522+
assertCometNativeWrite("insert into t values (default, 43)")
477523
checkAnswer(spark.table("t"), Row(null, 43))
478524
}
479525
}

0 commit comments

Comments
 (0)