Skip to content

Commit c5dfede

Browse files
Minni MittalCopilot
andcommitted
[VL] Fix streaming UTs by copying tags in EnsureLocalSortRequirements
When EnsureLocalSortRequirements adds a local SortExec to satisfy required child ordering, the new SortExec node was missing tags from the original child. This caused streaming test failures because StateStoreWriter and other stateful operators rely on tags (e.g., isStatefulOperatorStreamingRestore) to propagate streaming execution metadata. Fix: Call newChild.copyTagsFrom(originalChild) to preserve tags when wrapping a child plan with SortExec. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent f350a44 commit c5dfede

3 files changed

Lines changed: 7 additions & 6 deletions

File tree

gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/EnsureLocalSortRequirements.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ object EnsureLocalSortRequirements extends Rule[SparkPlan] {
3838
requiredOrdering: Seq[SortOrder]): SparkPlan = {
3939
// FIXME: HeuristicTransform is costly. Re-applying it may cause performance issues.
4040
val newChild = SortExec(requiredOrdering, global = false, child = originalChild)
41+
newChild.copyTagsFrom(originalChild)
4142
transform.apply(newChild)
4243
}
4344

gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1200,15 +1200,15 @@ class VeloxTestSettings extends BackendTestSettings {
12001200
enableSuite[GlutenFileStreamStressSuite]
12011201
// TODO: 4.x enableSuite[GlutenFlatMapGroupsInPandasWithStateDistributionSuite] // failures with GlutenPlugin
12021202
enableSuite[GlutenFlatMapGroupsInPandasWithStateSuite]
1203-
// TODO: 4.x enableSuite[GlutenFlatMapGroupsWithStateDistributionSuite]
1204-
// TODO: 4.x enableSuite[GlutenFlatMapGroupsWithStateSuite]
1203+
enableSuite[GlutenFlatMapGroupsWithStateDistributionSuite]
1204+
enableSuite[GlutenFlatMapGroupsWithStateSuite]
12051205
enableSuite[GlutenFlatMapGroupsWithStateWithInitialStateSuite]
12061206
enableSuite[GlutenGroupStateSuite]
12071207
enableSuite[GlutenLongOffsetSuite]
12081208
enableSuite[GlutenMemorySourceStressSuite]
12091209
// TODO: 4.x enableSuite[GlutenMultiStatefulOperatorsSuite] // 2 failures
12101210
enableSuite[GlutenReportSinkMetricsSuite]
1211-
// TODO: 4.x enableSuite[GlutenRocksDBStateStoreFlatMapGroupsWithStateSuite]
1211+
enableSuite[GlutenRocksDBStateStoreFlatMapGroupsWithStateSuite]
12121212
// TODO: 4.x enableSuite[GlutenRocksDBStateStoreStreamingAggregationSuite]
12131213
// TODO: 4.x enableSuite[GlutenRocksDBStateStoreStreamingDeduplicationSuite]
12141214
// TODO: 4.x enableSuite[GlutenStreamSuite]

gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1214,15 +1214,15 @@ class VeloxTestSettings extends BackendTestSettings {
12141214
enableSuite[GlutenFileStreamStressSuite]
12151215
// TODO: 4.x enableSuite[GlutenFlatMapGroupsInPandasWithStateDistributionSuite] // failures with GlutenPlugin
12161216
enableSuite[GlutenFlatMapGroupsInPandasWithStateSuite]
1217-
// TODO: 4.x enableSuite[GlutenFlatMapGroupsWithStateDistributionSuite]
1218-
// TODO: 4.x enableSuite[GlutenFlatMapGroupsWithStateSuite]
1217+
enableSuite[GlutenFlatMapGroupsWithStateDistributionSuite]
1218+
enableSuite[GlutenFlatMapGroupsWithStateSuite]
12191219
enableSuite[GlutenFlatMapGroupsWithStateWithInitialStateSuite]
12201220
enableSuite[GlutenGroupStateSuite]
12211221
enableSuite[GlutenLongOffsetSuite]
12221222
enableSuite[GlutenMemorySourceStressSuite]
12231223
// TODO: 4.x enableSuite[GlutenMultiStatefulOperatorsSuite] // 2 failures
12241224
enableSuite[GlutenReportSinkMetricsSuite]
1225-
// TODO: 4.x enableSuite[GlutenRocksDBStateStoreFlatMapGroupsWithStateSuite]
1225+
enableSuite[GlutenRocksDBStateStoreFlatMapGroupsWithStateSuite]
12261226
// TODO: 4.x enableSuite[GlutenRocksDBStateStoreStreamingAggregationSuite]
12271227
// TODO: 4.x enableSuite[GlutenRocksDBStateStoreStreamingDeduplicationSuite]
12281228
// TODO: 4.x enableSuite[GlutenStreamSuite]

0 commit comments

Comments
 (0)