Skip to content

Commit 2e5c22a

Browse files
committed
Fix: Handle GroupQueueCallbacks to Avoid Miscounts
1 parent 4a9c8aa commit 2e5c22a

2 files changed

Lines changed: 12 additions & 2 deletions

File tree

src/main/java/org/apache/sysds/runtime/ooc/stream/MergedOOCStream.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,17 @@ public MergedOOCStream(List<OOCStream<T>> sources) {
8585
if(_failed.get())
8686
return;
8787

88+
if(cb instanceof OOCStream.GroupQueueCallback<?>) {
89+
OOCStream.GroupQueueCallback<T> group = (OOCStream.GroupQueueCallback<T>) cb;
90+
for(int i = 0; i < group.size(); i++) {
91+
OOCStream.QueueCallback<T> sub = group.getCallback(i);
92+
try(sub) {
93+
_taskQueue.enqueue(sub.keepOpen());
94+
}
95+
}
96+
return;
97+
}
98+
8899
_taskQueue.enqueue(cb.keepOpen());
89100
}
90101
}

src/test/java/org/apache/sysds/test/functions/ooc/CBindTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,7 @@ public void setUp() {
5454
@Test
5555
public void testCBindAppendBlock() { runCBindTest(1000, 1000, 1000, 1000);}
5656

57-
// TODO: fix OOC internals
58-
// @Test
57+
@Test
5958
public void testCBindAppendBlockTwoLeftBlocks() {runCBindTest(1000, 2000, 1000, 1000);}
6059

6160
@Test

0 commit comments

Comments
 (0)