diff --git a/src/org/rascalmpl/library/util/Webclient.java b/src/org/rascalmpl/library/util/Webclient.java index 2ade187855..ea0ba1c453 100644 --- a/src/org/rascalmpl/library/util/Webclient.java +++ b/src/org/rascalmpl/library/util/Webclient.java @@ -219,19 +219,33 @@ private class WriterBodyPublisher implements BodyPublisher { @Override public void subscribe(Subscriber subscriber) { - final var publisher = new OutputStreamPublisher(subscriber); - - // without this asynchronous task, the publisher subscription - // will not have completed before the first write comes. - // it remains a question if we have a race here. - executor.submit(() -> { - try (OutputStreamWriter w = new OutputStreamWriter(publisher)) { - writerConsumer.accept(w); + // we have to wait till we get a callback from the subscription after + // we confirm it's subscription + subscriber.onSubscribe(new Subscription() { + @Override + public void request(long n) { + if (n <= 0) { + subscriber.onError(new RuntimeException("invalid request")); + return; + } + // without this asynchronous task, the publisher subscription + // will not have completed before the first write comes. + // it remains a question if we have a race here. + executor.submit(() -> { + try (OutputStreamWriter w = new OutputStreamWriter(new OutputStreamPublisher(subscriber))) { + writerConsumer.accept(w); + } + catch (IOException e) { + throw RuntimeExceptionFactory.io(e); + } + }, true); } - catch (IOException e) { - throw RuntimeExceptionFactory.io(e); + + @Override + public void cancel() { + subscriber.onComplete(); } - }, true); + }); } @Override @@ -273,65 +287,29 @@ public OutputStreamPublisher(Subscriber subscriber) { super(new PublishingStream(subscriber)); } /** - * The buffed outputstream will take care to collect the bytes untill there's a decent chunk to forward to the consumers + * The buffed outputstream will take care to collect the bytes untill there's a decent chunk to forward to the subscriber */ - private static class PublishingStream extends OutputStream implements Subscription { + private static class PublishingStream extends OutputStream { private final Subscriber subscriber; - CountDownLatch latch = new CountDownLatch(1); public PublishingStream(Subscriber subscriber) { this.subscriber = subscriber; - subscriber.onSubscribe(this); - } - - /** - * If we don't wait for the first call to `request` the HttpClient - * framework is (sometimes) not ready to accept the first call to `onNext` - * and throws NPEs. This semaphor avoids the situation alltogether. - */ - private void waitForFirstRequest() { - try { - // await is very fast after the count has gone to 0. - latch.await(); - } - catch (InterruptedException e) { - // do nothing - } } @Override public void write(int b) throws IOException { - waitForFirstRequest(); subscriber.onNext(ByteBuffer.wrap(new byte[] { (byte)(b & 0xFF) })); } @Override public void write(byte[] b, int off, int len) throws IOException { - waitForFirstRequest(); subscriber.onNext(ByteBuffer.wrap(b, off, len).asReadOnlyBuffer()); } @Override public void close() throws IOException { - waitForFirstRequest(); subscriber.onComplete(); } - - @Override - public void request(long n) { - // open the stream - latch.countDown(); - } - - @Override - public void cancel() { - try { - close(); - } - catch (IOException e) { - // ignore - } - } } }