Skip to content

Commit 98c29c0

Browse files
committed
Minor tweaks
1 parent 1bde6f8 commit 98c29c0

4 files changed

Lines changed: 122 additions & 46 deletions

File tree

src/main/java/org/apache/groovy/runtime/async/AsyncStreamGenerator.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,13 @@ public class AsyncStreamGenerator<T> implements AsyncStream<T> {
8787

8888
private static final Object DONE = new Object();
8989

90+
/**
91+
* Cached awaitables for the two common {@code moveNext()} outcomes.
92+
* Eliminates per-call object allocation on the hot path.
93+
*/
94+
private static final Awaitable<Boolean> MOVE_NEXT_TRUE = Awaitable.of(Boolean.TRUE);
95+
private static final Awaitable<Boolean> MOVE_NEXT_FALSE = Awaitable.of(Boolean.FALSE);
96+
9097
private final SynchronousQueue<Object> queue = new SynchronousQueue<>();
9198
private final AtomicBoolean closed = new AtomicBoolean(false);
9299
private final AtomicReference<Thread> producerThread = new AtomicReference<>();
@@ -231,7 +238,7 @@ public void error(Throwable t) {
231238
@SuppressWarnings("unchecked")
232239
public Awaitable<Boolean> moveNext() {
233240
if (closed.get()) {
234-
return Awaitable.of(false);
241+
return MOVE_NEXT_FALSE;
235242
}
236243
// Enforce single-consumer semantics: only one thread may call moveNext()
237244
// at a time. A concurrent second caller would overwrite consumerThread,
@@ -252,13 +259,13 @@ public Awaitable<Boolean> moveNext() {
252259
// re-check the consumer would block in queue.take() indefinitely.
253260
if (closed.get()) {
254261
consumerThread.compareAndSet(currentThread, null);
255-
return Awaitable.of(false);
262+
return MOVE_NEXT_FALSE;
256263
}
257264
try {
258265
Object next = queue.take();
259266
if (next == DONE) {
260267
closed.set(true);
261-
return Awaitable.of(false);
268+
return MOVE_NEXT_FALSE;
262269
}
263270
if (next instanceof ErrorItem ei) {
264271
closed.set(true);
@@ -267,10 +274,10 @@ public Awaitable<Boolean> moveNext() {
267274
throw AsyncSupport.sneakyThrow(cause);
268275
}
269276
current = (T) ((Item) next).value;
270-
return Awaitable.of(true);
277+
return MOVE_NEXT_TRUE;
271278
} catch (InterruptedException e) {
272279
if (closed.get()) {
273-
return Awaitable.of(false);
280+
return MOVE_NEXT_FALSE;
274281
}
275282
Thread.currentThread().interrupt();
276283
throw interrupted("Interrupted during moveNext", e);

src/main/java/org/apache/groovy/runtime/async/FlowPublisherAdapter.java

Lines changed: 64 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -65,12 +65,13 @@
6565
* <li>§2.5 — duplicate {@code onSubscribe} cancels the second subscription</li>
6666
* <li>§2.13 — {@code null} items in {@code onNext} are rejected immediately</li>
6767
* <li>All signals ({@code onNext}, {@code onError}, {@code onComplete}) use
68-
* blocking {@code put()} to guarantee delivery even under queue
69-
* contention</li>
68+
* blocking {@code put()} with a non-blocking {@code offer()} fallback
69+
* when the publisher thread is interrupted — this prevents both silent
70+
* item loss and consumer deadlock even under unexpected interrupts</li>
7071
* <li>Back-pressure is enforced by requesting exactly one item after
71-
* each consumed element; demand is signalled <em>before</em> the
72-
* consumer's {@code moveNext()} awaitable completes, preventing
73-
* livelock when producer and consumer share the same thread pool</li>
72+
* each consumed element; demand is signalled <em>before</em>
73+
* {@code moveNext()} returns, preventing livelock when producer and
74+
* consumer share the same thread pool</li>
7475
* </ul>
7576
*
7677
* @see groovy.concurrent.AwaitableAdapterRegistry
@@ -86,6 +87,14 @@ public class FlowPublisherAdapter implements AwaitableAdapter {
8687
*/
8788
private static final int QUEUE_CAPACITY = 256;
8889

90+
/**
91+
* Cached awaitables for the two common {@code moveNext()} outcomes.
92+
* Eliminates per-call {@link CompletableFuture} + {@link GroovyPromise}
93+
* allocation on the hot path (every element and stream-end).
94+
*/
95+
private static final Awaitable<Boolean> MOVE_NEXT_TRUE = Awaitable.of(Boolean.TRUE);
96+
private static final Awaitable<Boolean> MOVE_NEXT_FALSE = Awaitable.of(Boolean.FALSE);
97+
8998
/**
9099
* Returns {@code true} if the given type is assignable to
91100
* {@link Flow.Publisher}, enabling single-value {@code await}.
@@ -234,9 +243,10 @@ private record ErrorSignal(Throwable error) {
234243
* share the same thread pool.</p>
235244
*
236245
* <p>The internal bounded queue (capacity {@value QUEUE_CAPACITY})
237-
* absorbs minor timing jitter between producer and consumer. All
238-
* signals use blocking {@code put()}, ensuring no items or terminal
239-
* events are silently dropped.</p>
246+
* absorbs minor timing jitter between producer and consumer. Signals
247+
* use blocking {@code put()} for normal delivery with a non-blocking
248+
* {@code offer()} fallback when the publisher thread is interrupted —
249+
* ensuring no items or terminal events are silently dropped.</p>
240250
*
241251
* <p><b>Resource management:</b> When the consumer calls
242252
* {@link AsyncStream#close()} (e.g. via {@code break} in a
@@ -293,6 +303,21 @@ public void onNext(T item) {
293303
queue.put(new ValueSignal<>(item));
294304
} catch (InterruptedException ie) {
295305
Thread.currentThread().interrupt();
306+
// Blocking put() was interrupted. Fall back to non-blocking
307+
// offer() so the item still reaches the consumer. With
308+
// one-at-a-time demand the queue is almost never full, so
309+
// offer() effectively always succeeds. If it doesn't
310+
// (misbehaving publisher overfilling the queue), cancel
311+
// upstream and inject an error signal to terminate the
312+
// consumer cleanly instead of silently dropping the item.
313+
if (!queue.offer(new ValueSignal<>(item))) {
314+
Flow.Subscription sub = subRef.getAndSet(null);
315+
if (sub != null) sub.cancel();
316+
closedRef.set(true);
317+
queue.offer(new ErrorSignal(
318+
new CancellationException(
319+
"Item delivery interrupted and queue full")));
320+
}
296321
}
297322
}
298323
}
@@ -308,6 +333,13 @@ public void onError(Throwable t) {
308333
queue.put(new ErrorSignal(t));
309334
} catch (InterruptedException ie) {
310335
Thread.currentThread().interrupt();
336+
// Blocking put() was interrupted. Fall back to non-blocking
337+
// offer(). If that also fails (queue full), set streamClosed
338+
// so the consumer's next moveNext() returns false instead of
339+
// blocking indefinitely.
340+
if (!queue.offer(new ErrorSignal(t))) {
341+
streamClosed.set(true);
342+
}
311343
}
312344
}
313345

@@ -319,6 +351,11 @@ public void onComplete() {
319351
queue.put(COMPLETE_SENTINEL);
320352
} catch (InterruptedException ie) {
321353
Thread.currentThread().interrupt();
354+
// Same fallback as onError: try non-blocking offer,
355+
// set streamClosed if that also fails.
356+
if (!queue.offer(COMPLETE_SENTINEL)) {
357+
streamClosed.set(true);
358+
}
322359
}
323360
}
324361
});
@@ -328,47 +365,42 @@ public void onComplete() {
328365

329366
@Override
330367
public Awaitable<Boolean> moveNext() {
331-
// Fast path: stream already closed — no CF allocation needed
332368
if (streamClosed.get()) {
333-
return Awaitable.of(Boolean.FALSE);
369+
return MOVE_NEXT_FALSE;
334370
}
335371

336-
CompletableFuture<Boolean> cf = new CompletableFuture<>();
337372
try {
338373
Object signal = queue.take();
339374

340375
if (signal instanceof ValueSignal) {
341376
current = ((ValueSignal<T>) signal).value;
342-
// Signal demand for the next item BEFORE completing
343-
// the awaitable, so the publisher can begin producing
344-
// the next value while the consumer processes this one.
345-
// Ordering here is critical: if request(1) were called
346-
// after cf.complete(), the consumer could re-enter
347-
// moveNext() and block in take() before demand was
348-
// signalled, creating a livelock.
377+
// Signal demand BEFORE returning so the publisher can
378+
// begin producing the next value while the consumer
379+
// processes this one — prevents livelock when both
380+
// share a thread pool.
349381
Flow.Subscription sub = subRef.get();
350382
if (sub != null) sub.request(1);
351-
cf.complete(Boolean.TRUE);
352-
} else if (signal instanceof ErrorSignal) {
383+
return MOVE_NEXT_TRUE;
384+
} else if (signal instanceof ErrorSignal es) {
353385
streamClosed.set(true);
354-
cf.completeExceptionally(((ErrorSignal) signal).error);
386+
// Throw directly (matching AsyncStreamGenerator) to
387+
// avoid unnecessary CF allocation on the error path
388+
// and JDK 23+ CompletableFuture.get() wrapping issues.
389+
Throwable cause = es.error;
390+
if (cause instanceof Error err) throw err;
391+
throw AsyncSupport.sneakyThrow(cause);
355392
} else {
356-
// COMPLETE_SENTINEL or unknown — treat as end-of-stream
393+
// COMPLETE_SENTINEL end-of-stream
357394
streamClosed.set(true);
358-
cf.complete(Boolean.FALSE);
395+
return MOVE_NEXT_FALSE;
359396
}
360397
} catch (InterruptedException ie) {
361-
// Consumer thread was interrupted — throw directly as
362-
// CancellationException (matching AsyncStreamGenerator behaviour
363-
// and avoiding JDK 23+ CompletableFuture.get() wrapping issues)
364398
streamClosed.set(true);
365399
Thread.currentThread().interrupt();
366400
CancellationException ce = new CancellationException("Interrupted during moveNext");
367401
ce.initCause(ie);
368402
throw ce;
369403
}
370-
371-
return new GroovyPromise<>(cf);
372404
}
373405

374406
@Override
@@ -380,19 +412,15 @@ public T getCurrent() {
380412
public void close() {
381413
if (streamClosed.compareAndSet(false, true)) {
382414
closedRef.set(true);
383-
// Cancel the upstream subscription
384415
Flow.Subscription sub = subRef.getAndSet(null);
385416
if (sub != null) sub.cancel();
386417
// Drain the queue and inject a sentinel to unblock a
387418
// concurrent moveNext() that may be blocked in take().
388-
// Using blocking put() after clear() guarantees delivery;
389-
// since the queue is freshly cleared, put() will not block.
419+
// offer() is non-blocking and cannot throw InterruptedException;
420+
// after clear(), the queue is empty (capacity 256) so offer()
421+
// effectively always succeeds.
390422
queue.clear();
391-
try {
392-
queue.put(COMPLETE_SENTINEL);
393-
} catch (InterruptedException ie) {
394-
Thread.currentThread().interrupt();
395-
}
423+
queue.offer(COMPLETE_SENTINEL);
396424
}
397425
}
398426
};

src/spec/doc/core-async-await.adoc

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -985,8 +985,10 @@ writes (adapter registration) are rare, reads (every `await`) are frequent
985985
idempotent close/cancellation signalling
986986
* `Flow.Publisher` adaptation (`FlowPublisherAdapter`) — `AtomicReference<Subscription>` with CAS-guarded
987987
onSubscribe (§2.5 compliance), `AtomicBoolean` done-guard for single-value path, and close-aware
988-
queue signalling; all signals (`onNext`/`onError`/`onComplete`) use blocking `put()` for
989-
guaranteed delivery; demand is signalled before the consumer's awaitable completes to prevent livelock
988+
queue signalling; all signals (`onNext`/`onError`/`onComplete`) use blocking `put()` with a
989+
non-blocking `offer()` fallback on interrupt, preventing both silent item loss and consumer deadlock;
990+
demand is signalled before `moveNext()` returns to prevent livelock; `moveNext()` uses cached
991+
`Awaitable` instances to eliminate per-call allocations on the hot path
990992
* Defer scopes — per-task `ArrayDeque`, confined to a single thread (no sharing)
991993
* `DELAY_SCHEDULER` — single daemon thread for non-blocking timer operations
992994
@@ -1032,6 +1034,43 @@ prevents the consumer from blocking indefinitely on a subsequent `moveNext()`
10321034
measure against unexpected thread interruption outside the normal close path.
10331035
|===
10341036
1037+
==== Flow.Publisher Adaptation Safety
1038+
1039+
The `FlowPublisherAdapter` bridges the push-based `Flow.Publisher` protocol into the pull-based
1040+
`AsyncStream` model. Several concurrency hazards are handled transparently:
1041+
1042+
[cols="2,3"]
1043+
|===
1044+
| Concern | Mechanism
1045+
1046+
| **Back-pressure**
1047+
| A bounded `LinkedBlockingQueue` (capacity 256) bridges push and pull. Demand is capped at
1048+
one item per `moveNext()` call via `request(1)`. Demand is signalled _before_ `moveNext()`
1049+
returns, preventing livelock when producer and consumer share a thread pool.
1050+
1051+
| **Interrupt-safe signal delivery**
1052+
| All subscriber callbacks (`onNext`, `onError`, `onComplete`) use blocking `put()` for normal
1053+
delivery, with a non-blocking `offer()` fallback when the publisher thread is interrupted.
1054+
If `offer()` also fails (queue full from a misbehaving publisher), `onNext` cancels the
1055+
upstream subscription and injects an error signal; `onError`/`onComplete` set the `streamClosed`
1056+
flag so the consumer's next `moveNext()` exits immediately.
1057+
1058+
| **Allocation-free hot path**
1059+
| `moveNext()` returns cached `Awaitable<Boolean>` instances for the value and end-of-stream
1060+
cases, eliminating per-call `CompletableFuture` + `GroovyPromise` allocation. Error signals
1061+
are thrown directly (matching `AsyncStreamGenerator` behaviour) rather than wrapped in a
1062+
failed `CompletableFuture`.
1063+
1064+
| **Non-blocking close**
1065+
| `close()` uses non-blocking `offer()` (instead of blocking `put()`) to inject the completion
1066+
sentinel after clearing the queue — this cannot throw `InterruptedException` and effectively
1067+
always succeeds because the queue was just drained.
1068+
1069+
| **Idempotent close**
1070+
| `close()` is guarded by `AtomicBoolean.compareAndSet()`, making it safe to call multiple
1071+
times from any thread without side effects.
1072+
|===
1073+
10351074
These mechanisms ensure that `yield return` / `for await` code remains as simple as writing
10361075
a synchronous loop, while the runtime handles all cross-thread coordination.
10371076

src/test/groovy/org/codehaus/groovy/transform/AsyncApiTest.groovy

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1503,11 +1503,13 @@ class AsyncApiTest {
15031503
AsyncStream<String> stream = AsyncStream.from(pub)
15041504
assert stream.moveNext().get() == true
15051505
assert stream.getCurrent() == 'item1'
1506+
// moveNext() throws the error directly (matching AsyncStreamGenerator
1507+
// behaviour) rather than wrapping it in a failed Awaitable
15061508
try {
1507-
stream.moveNext().get()
1509+
stream.moveNext()
15081510
assert false
1509-
} catch (ExecutionException e) {
1510-
assert e.cause instanceof IOException
1511+
} catch (IOException e) {
1512+
assert e.message == 'stream-err'
15111513
}
15121514
}
15131515

0 commit comments

Comments
 (0)