Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 27 additions & 49 deletions src/org/rascalmpl/library/util/Webclient.java
Original file line number Diff line number Diff line change
Expand Up @@ -219,19 +219,33 @@ private class WriterBodyPublisher implements BodyPublisher {

@Override
public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
final var publisher = new OutputStreamPublisher(subscriber);

// without this asynchronous task, the publisher subscription
// will not have completed before the first write comes.
// it remains a question if we have a race here.
executor.submit(() -> {
try (OutputStreamWriter w = new OutputStreamWriter(publisher)) {
writerConsumer.accept(w);
// we have to wait till we get a callback from the subscription after
// we confirm it's subscription
subscriber.onSubscribe(new Subscription() {
@Override
public void request(long n) {
if (n <= 0) {
subscriber.onError(new RuntimeException("invalid request"));
return;
}
// without this asynchronous task, the publisher subscription
// will not have completed before the first write comes.
// it remains a question if we have a race here.
executor.submit(() -> {
try (OutputStreamWriter w = new OutputStreamWriter(new OutputStreamPublisher(subscriber))) {
writerConsumer.accept(w);
}
catch (IOException e) {
throw RuntimeExceptionFactory.io(e);
}
}, true);
}
catch (IOException e) {
throw RuntimeExceptionFactory.io(e);

@Override
public void cancel() {
subscriber.onComplete();
}
}, true);
});
}

@Override
Expand Down Expand Up @@ -273,65 +287,29 @@ public OutputStreamPublisher(Subscriber<? super ByteBuffer> subscriber) {
super(new PublishingStream(subscriber));
}
/**
* The buffed outputstream will take care to collect the bytes untill there's a decent chunk to forward to the consumers
* The buffed outputstream will take care to collect the bytes untill there's a decent chunk to forward to the subscriber
*/
private static class PublishingStream extends OutputStream implements Subscription {
private static class PublishingStream extends OutputStream {
private final Subscriber<? super ByteBuffer> subscriber;
CountDownLatch latch = new CountDownLatch(1);

public PublishingStream(Subscriber<? super ByteBuffer> subscriber) {
this.subscriber = subscriber;
subscriber.onSubscribe(this);
}

/**
* If we don't wait for the first call to `request` the HttpClient
* framework is (sometimes) not ready to accept the first call to `onNext`
* and throws NPEs. This semaphor avoids the situation alltogether.
*/
private void waitForFirstRequest() {
try {
// await is very fast after the count has gone to 0.
latch.await();
}
catch (InterruptedException e) {
// do nothing
}
}

@Override
public void write(int b) throws IOException {
waitForFirstRequest();
subscriber.onNext(ByteBuffer.wrap(new byte[] { (byte)(b & 0xFF) }));
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
waitForFirstRequest();
subscriber.onNext(ByteBuffer.wrap(b, off, len).asReadOnlyBuffer());
}

@Override
public void close() throws IOException {
waitForFirstRequest();
subscriber.onComplete();
}

@Override
public void request(long n) {
// open the stream
latch.countDown();
}

@Override
public void cancel() {
try {
close();
}
catch (IOException e) {
// ignore
}
}
}
}

Expand Down
Loading