Skip to content

Commit f043c2c

Browse files
committed
Minor tweaks
1 parent aa4e567 commit f043c2c

8 files changed

Lines changed: 138 additions & 22 deletions

File tree

src/main/java/org/apache/groovy/parser/antlr4/AstBuilder.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -925,6 +925,10 @@ public ReturnStatement visitYieldStmtAlt(final YieldStmtAltContext ctx) {
925925

926926
@Override
927927
public ExpressionStatement visitYieldReturnStmtAlt(final YieldReturnStmtAltContext ctx) {
928+
if (asyncContextDepth == 0) {
929+
throw createParsingFailedException(
930+
"`yield return` can only be used inside an async method or async closure", ctx);
931+
}
928932
Expression expr = (Expression) this.visit(ctx.expression());
929933
Expression yieldCall = AsyncTransformHelper.buildYieldReturnCall(expr);
930934
return configureAST(new ExpressionStatement(yieldCall), ctx);

src/main/java/org/apache/groovy/runtime/async/FlowPublisherAdapter.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@
6868
* blocking {@code put()} with a non-blocking {@code offer()} fallback
6969
* when the publisher thread is interrupted — this prevents both silent
7070
* item loss and consumer deadlock even under unexpected interrupts</li>
71+
* <li>Terminal callbacks atomically close the upstream side
72+
* ({@code closedRef}) and clear/cancel the stored subscription.
73+
* This makes post-terminal {@code onNext} calls from non-compliant
74+
* publishers harmless and releases resources promptly.</li>
7175
* <li>Back-pressure is enforced by requesting exactly one item after
7276
* each consumed element; demand is signalled <em>before</em>
7377
* {@code moveNext()} returns, preventing livelock when producer and
@@ -210,6 +214,9 @@ public void onComplete() {
210214
// Publisher completed before emitting — resolve to null
211215
if (done.compareAndSet(false, true)) {
212216
cf.complete(null);
217+
// Mirror onNext/onError cleanup for prompt resource release.
218+
Flow.Subscription sub = subRef.getAndSet(null);
219+
if (sub != null) sub.cancel();
213220
}
214221
}
215222
});
@@ -324,6 +331,10 @@ public void onNext(T item) {
324331

325332
@Override
326333
public void onError(Throwable t) {
334+
// First terminal signal wins. Ignore duplicate terminal callbacks.
335+
if (!closedRef.compareAndSet(false, true)) {
336+
return;
337+
}
327338
// Cancel subscription eagerly to release upstream resources
328339
Flow.Subscription sub = subRef.getAndSet(null);
329340
if (sub != null) sub.cancel();
@@ -345,6 +356,13 @@ public void onError(Throwable t) {
345356

346357
@Override
347358
public void onComplete() {
359+
// First terminal signal wins. Ignore duplicate terminal callbacks.
360+
if (!closedRef.compareAndSet(false, true)) {
361+
return;
362+
}
363+
// Clear subscription consistently with other terminal paths.
364+
Flow.Subscription sub = subRef.getAndSet(null);
365+
if (sub != null) sub.cancel();
348366
try {
349367
// Blocking put() guarantees the consumer will see the sentinel,
350368
// even if the queue was temporarily full from buffered values.

src/spec/doc/core-async-await.adoc

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,10 @@ until the consumer is ready for the next value. This design means:
405405
interrupted, preventing resource leaks
406406
* The generator can mix `yield return` with `await`, performing I/O between yields
407407
408+
NOTE: `yield return` can only appear inside an `async` method or `async` closure/lambda.
409+
Using it elsewhere produces a compile-time error:
410+
_"`yield return` can only be used inside an async method or async closure/lambda"_.
411+
408412
=== Basic Generator
409413
410414
[source,groovy]
@@ -619,6 +623,9 @@ one item at a time; demand for the next element is signalled _before_ the consum
619623
`moveNext()` awaitable completes, so the publisher can begin producing the next value while
620624
the consumer processes the current one.
621625
626+
The examples below use deterministic subscriber-handshake waiting with a bounded timeout
627+
before submitting values, avoiding timing races and unbounded waits in CI environments.
628+
622629
=== Awaiting a Single Value
623630
624631
[source,groovy]
@@ -1277,4 +1284,3 @@ JavaScript, C#, Kotlin, and Swift, for developers familiar with those languages.
12771284
| AwaitResult
12781285
| `result.isSuccess()` / `result.isFailure()` / `result.getOrElse { fallback }`
12791286
|===
1280-

src/spec/test/AsyncAwaitSpecTest.groovy

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -699,10 +699,15 @@ assert await(Awaitable.completeOnTimeoutMillis(slowCall(), "cached", 50)) == "ca
699699
import java.util.concurrent.SubmissionPublisher
700700
701701
def publisher = new SubmissionPublisher<String>()
702-
Thread.start {
703-
while (publisher.numberOfSubscribers == 0) {
704-
Thread.yield()
702+
def waitForSubscriber = { SubmissionPublisher pub, long timeoutMillis = 5_000L ->
703+
long deadline = System.nanoTime() + timeoutMillis * 1_000_000L
704+
while (pub.numberOfSubscribers == 0 && System.nanoTime() < deadline) {
705+
Thread.sleep(1)
705706
}
707+
assert pub.numberOfSubscribers > 0 : "Timed out waiting for publisher subscription"
708+
}
709+
Thread.start {
710+
waitForSubscriber(publisher)
706711
publisher.submit("hello from publisher")
707712
publisher.close()
708713
}
@@ -731,8 +736,15 @@ class StreamConsumer {
731736
732737
def publisher = new SubmissionPublisher<Integer>()
733738
def future = new StreamConsumer().consumeAll(publisher)
739+
def waitForSubscriber = { SubmissionPublisher pub, long timeoutMillis = 5_000L ->
740+
long deadline = System.nanoTime() + timeoutMillis * 1_000_000L
741+
while (pub.numberOfSubscribers == 0 && System.nanoTime() < deadline) {
742+
Thread.sleep(1)
743+
}
744+
assert pub.numberOfSubscribers > 0 : "Timed out waiting for publisher subscription"
745+
}
734746
Thread.start {
735-
Thread.sleep(50)
747+
waitForSubscriber(publisher)
736748
(1..5).each { publisher.submit(it) }
737749
publisher.close()
738750
}

src/test/groovy/org/codehaus/groovy/transform/AsyncConcurrencyTest.groovy renamed to src/test/groovy/org/apache/groovy/runtime/async/AsyncConcurrencyTest.groovy

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,8 @@
1717
* under the License.
1818
*/
1919

20-
package org.codehaus.groovy.transform
20+
package org.apache.groovy.runtime.async
2121

22-
import org.apache.groovy.runtime.async.AsyncStreamGenerator
2322
import org.junit.jupiter.api.Test
2423
import org.junit.jupiter.api.Timeout
2524
import org.junit.jupiter.api.DisplayName

src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import java.util.concurrent.FutureTask
4141
import java.util.concurrent.SubmissionPublisher
4242
import java.util.concurrent.TimeUnit
4343
import java.util.concurrent.atomic.AtomicBoolean
44+
import java.util.concurrent.atomic.AtomicReference
4445

4546
import static groovy.test.GroovyAssert.shouldFail
4647

@@ -1455,17 +1456,54 @@ class AsyncApiTest {
14551456

14561457
@Test
14571458
void testFlowPublisherToAwaitableOnCompleteWithoutValue() {
1459+
def cancelled = new AtomicBoolean(false)
14581460
def pub = new Flow.Publisher<String>() {
14591461
void subscribe(Flow.Subscriber<? super String> s) {
14601462
s.onSubscribe(new Flow.Subscription() {
14611463
void request(long n) {}
1462-
void cancel() {}
1464+
void cancel() { cancelled.set(true) }
14631465
})
14641466
s.onComplete()
14651467
}
14661468
}
14671469
Awaitable<String> aw = Awaitable.from(pub)
14681470
assert aw.get() == null
1471+
assert cancelled.get() : 'Subscription should be cancelled/cleared on terminal completion'
1472+
}
1473+
1474+
@Test
1475+
void testFlowPublisherAsyncStreamIgnoresLateOnNextAfterComplete() {
1476+
def producerRef = new AtomicReference<Thread>()
1477+
def pub = new Flow.Publisher<Integer>() {
1478+
void subscribe(Flow.Subscriber<? super Integer> s) {
1479+
s.onSubscribe(new Flow.Subscription() {
1480+
void request(long n) {}
1481+
void cancel() {}
1482+
})
1483+
producerRef.set(Thread.start {
1484+
// Non-compliant publisher: emits onNext after terminal signal.
1485+
s.onComplete()
1486+
for (int i = 0; i < 1000; i++) {
1487+
s.onNext(i)
1488+
}
1489+
})
1490+
}
1491+
}
1492+
AsyncStream<Integer> stream = AsyncStream.from(pub)
1493+
Thread producer = producerRef.get()
1494+
assert producer != null
1495+
try {
1496+
producer.join(1000)
1497+
assert !producer.isAlive() : 'late onNext after terminal should not block producer'
1498+
assert stream.moveNext().get() == false
1499+
assert stream.moveNext().get() == false
1500+
} finally {
1501+
stream.close()
1502+
if (producer != null && producer.isAlive()) {
1503+
producer.interrupt()
1504+
producer.join(1000)
1505+
}
1506+
}
14691507
}
14701508

14711509
@Test

src/test/groovy/org/codehaus/groovy/transform/AsyncAwaitSyntaxTest.groovy

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1762,6 +1762,36 @@ class AsyncAwaitSyntaxTest {
17621762
assert err.message.contains('defer') && err.message.contains('async')
17631763
}
17641764

1765+
@Test
1766+
void testYieldReturnOutsideAsyncMethodFailsCompileTime() {
1767+
def err = shouldFail CompilationFailedException, '''
1768+
class Svc {
1769+
def normalMethod() {
1770+
yield return 1
1771+
}
1772+
}
1773+
'''
1774+
assert err.message.contains('yield return') && err.message.contains('async')
1775+
}
1776+
1777+
@Test
1778+
void testYieldReturnOutsideAsyncClosureFailsCompileTime() {
1779+
def err = shouldFail CompilationFailedException, '''
1780+
def fn = { ->
1781+
yield return 1
1782+
}
1783+
'''
1784+
assert err.message.contains('yield return') && err.message.contains('async')
1785+
}
1786+
1787+
@Test
1788+
void testYieldReturnAtScriptTopLevelFailsCompileTime() {
1789+
def err = shouldFail CompilationFailedException, '''
1790+
yield return 1
1791+
'''
1792+
assert err.message.contains('yield return') && err.message.contains('async')
1793+
}
1794+
17651795
@Test
17661796
void testDeferInsideAsyncMethodSucceeds() {
17671797
assertScript '''
@@ -2026,14 +2056,12 @@ class AsyncAwaitSyntaxTest {
20262056
}
20272057

20282058
@Test
2029-
void testYieldReturnWithAnnotation() {
2059+
void testYieldReturnInAsyncMethod() {
20302060
assertScript '''
2031-
import groovy.transform.Async
20322061
import groovy.concurrent.AsyncStream
20332062
20342063
class Gen {
2035-
@Async
2036-
def items() {
2064+
async items() {
20372065
yield return "x"
20382066
yield return "y"
20392067
}

src/test/groovy/org/codehaus/groovy/transform/AsyncDeferFlowTest.groovy

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,10 @@ import static groovy.test.GroovyAssert.assertScript
3333
* <p>{@code Flow.Publisher} instances are automatically adapted by the built-in
3434
* adapter, enabling seamless use with {@code await} and {@code for await}.
3535
*
36-
* <p><b>Test synchronisation:</b> Flow.Publisher tests use
36+
* <p><b>Test synchronisation:</b> Flow.Publisher tests use bounded waits on
3737
* {@code SubmissionPublisher.getNumberOfSubscribers()} to wait until the
38-
* subscription handshake is complete before submitting items. This
39-
* eliminates the race between subscription establishment and item
40-
* delivery that caused intermittent failures with hard-coded delays.
38+
* subscription handshake is complete before submitting items. This keeps
39+
* tests deterministic while preventing unbounded hangs on regressions.
4140
*/
4241
class AsyncDeferFlowTest {
4342

@@ -265,7 +264,9 @@ class AsyncDeferFlowTest {
265264
}
266265
def future = task()
267266
// Wait until the subscriber is registered with the publisher
268-
while (publisher.getNumberOfSubscribers() == 0) { Thread.sleep(1) }
267+
def deadline = System.nanoTime() + 5_000_000_000L
268+
while (publisher.getNumberOfSubscribers() == 0 && System.nanoTime() < deadline) { Thread.sleep(1) }
269+
assert publisher.getNumberOfSubscribers() > 0 : 'Timed out waiting for publisher subscription'
269270
subscribed.countDown()
270271
def result = await(future)
271272
assert result == 'hello'
@@ -288,7 +289,9 @@ class AsyncDeferFlowTest {
288289
def publisher = new java.util.concurrent.SubmissionPublisher<Integer>()
289290
def future = new FlowTest().consumePublisher(publisher)
290291
// Wait until the for-await loop has subscribed to the publisher
291-
while (publisher.getNumberOfSubscribers() == 0) { Thread.sleep(1) }
292+
def deadline = System.nanoTime() + 5_000_000_000L
293+
while (publisher.getNumberOfSubscribers() == 0 && System.nanoTime() < deadline) { Thread.sleep(1) }
294+
assert publisher.getNumberOfSubscribers() > 0 : 'Timed out waiting for publisher subscription'
292295
(1..5).each { publisher.submit(it) }
293296
publisher.close()
294297
def result = await(future)
@@ -312,7 +315,9 @@ class AsyncDeferFlowTest {
312315
def publisher = new java.util.concurrent.SubmissionPublisher<Integer>()
313316
def future = new FlowTest().consumeWithError(publisher)
314317
// Wait until the for-await loop has subscribed
315-
while (publisher.getNumberOfSubscribers() == 0) { Thread.sleep(1) }
318+
def deadline = System.nanoTime() + 5_000_000_000L
319+
while (publisher.getNumberOfSubscribers() == 0 && System.nanoTime() < deadline) { Thread.sleep(1) }
320+
assert publisher.getNumberOfSubscribers() > 0 : 'Timed out waiting for publisher subscription'
316321
publisher.submit(1)
317322
publisher.submit(2)
318323
publisher.closeExceptionally(new RuntimeException('stream-error'))
@@ -340,7 +345,9 @@ class AsyncDeferFlowTest {
340345
}
341346
def future = task()
342347
// Wait until the for-await loop has subscribed
343-
while (publisher.getNumberOfSubscribers() == 0) { Thread.sleep(1) }
348+
def deadline = System.nanoTime() + 5_000_000_000L
349+
while (publisher.getNumberOfSubscribers() == 0 && System.nanoTime() < deadline) { Thread.sleep(1) }
350+
assert publisher.getNumberOfSubscribers() > 0 : 'Timed out waiting for publisher subscription'
344351
['a', 'b', 'c'].each { publisher.submit(it) }
345352
publisher.close()
346353
def result = await(future)
@@ -368,7 +375,9 @@ class AsyncDeferFlowTest {
368375
def publisher = new java.util.concurrent.SubmissionPublisher<Integer>()
369376
def future = new CombinedTest().processStream(publisher)
370377
// Wait until the for-await loop has subscribed
371-
while (publisher.getNumberOfSubscribers() == 0) { Thread.sleep(1) }
378+
def deadline = System.nanoTime() + 5_000_000_000L
379+
while (publisher.getNumberOfSubscribers() == 0 && System.nanoTime() < deadline) { Thread.sleep(1) }
380+
assert publisher.getNumberOfSubscribers() > 0 : 'Timed out waiting for publisher subscription'
372381
(1..3).each { publisher.submit(it) }
373382
publisher.close()
374383
def result = await(future)
@@ -388,7 +397,9 @@ class AsyncDeferFlowTest {
388397
}
389398
def future = task()
390399
// Wait until the subscriber is registered
391-
while (publisher.getNumberOfSubscribers() == 0) { Thread.sleep(1) }
400+
def deadline = System.nanoTime() + 5_000_000_000L
401+
while (publisher.getNumberOfSubscribers() == 0 && System.nanoTime() < deadline) { Thread.sleep(1) }
402+
assert publisher.getNumberOfSubscribers() > 0 : 'Timed out waiting for publisher subscription'
392403
publisher.submit(42)
393404
publisher.submit(99) // second value ignored by await
394405
publisher.close()

0 commit comments

Comments
 (0)