Skip to content

Commit 24ae735

Browse files
baibaichenCopilot
andcommitted
[GLUTEN-11550][UT] Disable flaky GlutenSparkSessionJobTaggingAndCancellationSuite
The testGluten for 'Tags set from session are prefixed with session UUID' is flaky due to InheritableThreadLocal + ForkJoinPool timing issues. Disable the entire suite until root cause is fully resolved. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 6a9874d commit 24ae735

4 files changed

Lines changed: 4 additions & 136 deletions

File tree

gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -813,8 +813,7 @@ class VeloxTestSettings extends BackendTestSettings {
813813
enableSuite[GlutenSessionStateSuite]
814814
// TODO: 4.x enableSuite[GlutenSetCommandSuite] // 1 failure
815815
enableSuite[GlutenSparkSessionBuilderSuite]
816-
enableSuite[GlutenSparkSessionJobTaggingAndCancellationSuite]
817-
.exclude("Tags set from session are prefixed with session UUID")
816+
// TODO: 4.x enableSuite[GlutenSparkSessionJobTaggingAndCancellationSuite] // flaky
818817
enableSuite[GlutenTPCDSCollationQueryTestSuite]
819818
enableSuite[GlutenTPCDSModifiedPlanStabilitySuite]
820819
enableSuite[GlutenTPCDSModifiedPlanStabilityWithStatsSuite]

gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenSparkSessionJobTaggingAndCancellationSuite.scala

Lines changed: 1 addition & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -16,71 +16,6 @@
1616
*/
1717
package org.apache.spark.sql
1818

19-
import org.apache.spark.SparkContext
20-
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
21-
import org.apache.spark.sql.execution.SQLExecution
22-
import org.apache.spark.util.ThreadUtils
23-
24-
import java.util.concurrent.{Executors, Semaphore, TimeUnit}
25-
26-
import scala.concurrent.{ExecutionContext, Future}
27-
import scala.concurrent.duration._
28-
2919
class GlutenSparkSessionJobTaggingAndCancellationSuite
3020
extends SparkSessionJobTaggingAndCancellationSuite
31-
with GlutenTestSetWithSystemPropertyTrait {
32-
33-
// Override with testGluten because the original test uses ExecutionContext.global whose
34-
// ForkJoinPool reuses threads created before addTag("one"). InheritableThreadLocal (used by
35-
// managedJobTags) only copies values at thread creation time, so reused threads see an empty
36-
// tag map. This causes withSessionTagsApplied to not attach the user tag to the job, making
37-
// cancelJobsWithTagWithFuture return empty. We fix this by creating a fresh thread pool AFTER
38-
// addTag so that new threads inherit the InheritableThreadLocal values.
39-
testGluten("Tags set from session are prefixed with session UUID") {
40-
sc = new SparkContext("local[2]", "test")
41-
val session = classic.SparkSession.builder().sparkContext(sc).getOrCreate()
42-
import session.implicits._
43-
44-
val sem = new Semaphore(0)
45-
sc.addSparkListener(new SparkListener {
46-
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
47-
sem.release()
48-
}
49-
})
50-
51-
session.addTag("one")
52-
53-
// Use a fresh thread pool created AFTER addTag so that new threads inherit
54-
// InheritableThreadLocal values (managedJobTags, threadUuid).
55-
val threadPool = Executors.newSingleThreadExecutor()
56-
implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(threadPool)
57-
try {
58-
Future {
59-
session.range(1, 10000).map { i => Thread.sleep(100); i }.count()
60-
}
61-
62-
assert(sem.tryAcquire(1, 1, TimeUnit.MINUTES))
63-
val activeJobsFuture =
64-
session.sparkContext.cancelJobsWithTagWithFuture(
65-
session.managedJobTags.get()("one"),
66-
"reason")
67-
val activeJob = ThreadUtils.awaitResult(activeJobsFuture, 60.seconds).head
68-
val actualTags = activeJob.properties
69-
.getProperty(SparkContext.SPARK_JOB_TAGS)
70-
.split(SparkContext.SPARK_JOB_TAGS_SEP)
71-
assert(
72-
actualTags.toSet == Set(
73-
session.sessionJobTag,
74-
s"${session.sessionJobTag}-thread-${session.threadUuid.get()}-one",
75-
SQLExecution.executionIdJobTag(
76-
session,
77-
activeJob.properties
78-
.get(SQLExecution.EXECUTION_ROOT_ID_KEY)
79-
.asInstanceOf[String]
80-
.toLong)
81-
))
82-
} finally {
83-
threadPool.shutdownNow()
84-
}
85-
}
86-
}
21+
with GlutenTestSetWithSystemPropertyTrait {}

gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -797,8 +797,7 @@ class VeloxTestSettings extends BackendTestSettings {
797797
enableSuite[GlutenSessionStateSuite]
798798
// TODO: 4.x enableSuite[GlutenSetCommandSuite] // 1 failure
799799
enableSuite[GlutenSparkSessionBuilderSuite]
800-
enableSuite[GlutenSparkSessionJobTaggingAndCancellationSuite]
801-
.exclude("Tags set from session are prefixed with session UUID")
800+
// TODO: 4.x enableSuite[GlutenSparkSessionJobTaggingAndCancellationSuite] // flaky
802801
enableSuite[GlutenTPCDSCollationQueryTestSuite]
803802
enableSuite[GlutenTPCDSModifiedPlanStabilitySuite]
804803
enableSuite[GlutenTPCDSModifiedPlanStabilityWithStatsSuite]

gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenSparkSessionJobTaggingAndCancellationSuite.scala

Lines changed: 1 addition & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -16,71 +16,6 @@
1616
*/
1717
package org.apache.spark.sql
1818

19-
import org.apache.spark.SparkContext
20-
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
21-
import org.apache.spark.sql.execution.SQLExecution
22-
import org.apache.spark.util.ThreadUtils
23-
24-
import java.util.concurrent.{Executors, Semaphore, TimeUnit}
25-
26-
import scala.concurrent.{ExecutionContext, Future}
27-
import scala.concurrent.duration._
28-
2919
class GlutenSparkSessionJobTaggingAndCancellationSuite
3020
extends SparkSessionJobTaggingAndCancellationSuite
31-
with GlutenTestSetWithSystemPropertyTrait {
32-
33-
// Override with testGluten because the original test uses ExecutionContext.global whose
34-
// ForkJoinPool reuses threads created before addTag("one"). InheritableThreadLocal (used by
35-
// managedJobTags) only copies values at thread creation time, so reused threads see an empty
36-
// tag map. This causes withSessionTagsApplied to not attach the user tag to the job, making
37-
// cancelJobsWithTagWithFuture return empty. We fix this by creating a fresh thread pool AFTER
38-
// addTag so that new threads inherit the InheritableThreadLocal values.
39-
testGluten("Tags set from session are prefixed with session UUID") {
40-
sc = new SparkContext("local[2]", "test")
41-
val session = classic.SparkSession.builder().sparkContext(sc).getOrCreate()
42-
import session.implicits._
43-
44-
val sem = new Semaphore(0)
45-
sc.addSparkListener(new SparkListener {
46-
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
47-
sem.release()
48-
}
49-
})
50-
51-
session.addTag("one")
52-
53-
// Use a fresh thread pool created AFTER addTag so that new threads inherit
54-
// InheritableThreadLocal values (managedJobTags, threadUuid).
55-
val threadPool = Executors.newSingleThreadExecutor()
56-
implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(threadPool)
57-
try {
58-
Future {
59-
session.range(1, 10000).map { i => Thread.sleep(100); i }.count()
60-
}
61-
62-
assert(sem.tryAcquire(1, 1, TimeUnit.MINUTES))
63-
val activeJobsFuture =
64-
session.sparkContext.cancelJobsWithTagWithFuture(
65-
session.managedJobTags.get()("one"),
66-
"reason")
67-
val activeJob = ThreadUtils.awaitResult(activeJobsFuture, 60.seconds).head
68-
val actualTags = activeJob.properties
69-
.getProperty(SparkContext.SPARK_JOB_TAGS)
70-
.split(SparkContext.SPARK_JOB_TAGS_SEP)
71-
assert(
72-
actualTags.toSet == Set(
73-
session.sessionJobTag,
74-
s"${session.sessionJobTag}-thread-${session.threadUuid.get()}-one",
75-
SQLExecution.executionIdJobTag(
76-
session,
77-
activeJob.properties
78-
.get(SQLExecution.EXECUTION_ROOT_ID_KEY)
79-
.asInstanceOf[String]
80-
.toLong)
81-
))
82-
} finally {
83-
threadPool.shutdownNow()
84-
}
85-
}
86-
}
21+
with GlutenTestSetWithSystemPropertyTrait {}

0 commit comments

Comments
 (0)