Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions marklogic-client-api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ dependencies {

testImplementation 'org.skyscreamer:jsonassert:1.5.3'

// ArchUnit for verifying certain classes aren't used
testImplementation 'com.tngtech.archunit:archunit-junit5:1.4.1'

// Automatic loading of test framework implementation dependencies is deprecated.
// https://docs.gradle.org/current/userguide/upgrading_version_8.html#test_framework_implementation_dependencies
// Without this, once using JUnit 5.12 or higher, Gradle will not find any tests and report an error of:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@
import jakarta.mail.internet.ContentDisposition;
import jakarta.mail.internet.MimeMultipart;
import jakarta.mail.internet.ParseException;
import jakarta.mail.util.ByteArrayDataSource;
import jakarta.xml.bind.DatatypeConverter;
import com.marklogic.client.impl.okhttp.InputStreamDataSource;
import okhttp3.*;
import okhttp3.MultipartBody.Part;
import okhttp3.logging.HttpLoggingInterceptor;
Expand Down Expand Up @@ -5305,7 +5305,9 @@ static private <T> T getEntity(ResponseBody body, Class<T> as) {
} else if (as == MimeMultipart.class) {
MediaType mediaType = body.contentType();
String contentType = (mediaType != null) ? mediaType.toString() : "application/x-unknown-content-type";
ByteArrayDataSource dataSource = new ByteArrayDataSource(body.byteStream(), contentType);
// Use custom DataSource to avoid reading document into memory. Allows a user to use an
// InputStreamHandle to fetch the content without being surprised that all the data is in memory already.
InputStreamDataSource dataSource = new InputStreamDataSource(body.byteStream(), contentType);
return (T) new MimeMultipart(dataSource);
Comment on lines +5308 to 5311
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this MimeMultipart conversion, replacing ByteArrayDataSource with a streaming DataSource means the ResponseBody may no longer be fully consumed/closed during construction. If the returned MimeMultipart (or any BodyPart streams) are not fully consumed and closed by the caller, the underlying OkHttp response can remain open. Consider documenting/ensuring lifecycle management (e.g., requiring callers to close the originating CallResponse/ResponseBody, or eagerly parsing/closing when appropriate).

Copilot uses AI. Check for mistakes.
} else if (as == File.class) {
// write out the response body to a temp file in the system temp folder
Expand Down Expand Up @@ -6055,12 +6057,10 @@ void setResponse(Response response) {
setNull(true);
return;
}
ByteArrayDataSource dataSource = new ByteArrayDataSource(
responseBody.byteStream(), contentType.toString()
);
// Use custom DataSource to avoid reading document into memory. Allows a user to use an
// InputStreamHandle to fetch the content without being surprised that all the data is in memory already.
InputStreamDataSource dataSource = new InputStreamDataSource(responseBody.byteStream(), contentType.toString());
setMultipart(new MimeMultipart(dataSource));
Comment on lines +6060 to 6063
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switching multipart handling to InputStreamDataSource changes this code path from eagerly buffering the entire response body (ByteArrayDataSource) to potentially leaving the underlying OkHttp ResponseBody open until the multipart is fully consumed. MultipleCallResponseImpl does not override close(), and the various asStreamOf*/asArrayOf* methods don’t close per-part InputStreams, so it’s easy for callers to accidentally leak the response body/connection. Consider (1) overriding close() on MultipleCallResponseImpl to close the stored Response/ResponseBody, and (2) ensuring part streams are closed when converting them to bytes/strings/readers.

Copilot uses AI. Check for mistakes.
} catch (IOException e) {
throw new MarkLogicIOException(e);
} catch (MessagingException e) {
throw new MarkLogicIOException(e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright (c) 2010-2026 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
*/
package com.marklogic.client.impl.okhttp;

import jakarta.activation.DataSource;

import java.io.InputStream;
import java.io.OutputStream;

/**
* A streaming DataSource implementation that wraps an InputStream without buffering it into memory.
* This is a critical component for enabling true streaming of large documents from MarkLogic,
* avoiding OutOfMemoryErrors when processing large result sets.
* <p>
* Unlike ByteArrayDataSource (which loads the entire stream into a byte array), this implementation
* preserves the streaming nature of the underlying InputStream, allowing documents to be processed
* incrementally as they are read from the network.
* <p>
* Note: This DataSource is read-only. The getOutputStream() method throws UnsupportedOperationException.
*/
public class InputStreamDataSource implements DataSource {

private final InputStream inputStream;
private final String contentType;

/**
* Creates a new InputStreamDataSource.
*
* @param inputStream the InputStream to wrap (will not be buffered into memory)
* @param contentType the MIME type of the data
*/
public InputStreamDataSource(InputStream inputStream, String contentType) {
this.inputStream = inputStream;
this.contentType = contentType;
}

@Override
public InputStream getInputStream() {
return inputStream;
}

@Override
public OutputStream getOutputStream() {
throw new UnsupportedOperationException("InputStreamDataSource is read-only");
}

@Override
public String getContentType() {
return contentType;
}

@Override
public String getName() {
return null;
}
Comment on lines +54 to +56
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InputStreamDataSource.getName() currently returns null. DataSource implementations typically return a non-null, stable name (useful for debugging/logging and some consumers may assume non-null). Consider returning a constant like the class name, or a name derived from contentType if appropriate.

Copilot uses AI. Check for mistakes.
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2010-2026 Progress Software Corporation and/or its subsidiaries or affiliates. All Rights Reserved.
*/
package com.marklogic.client.test;

import com.marklogic.client.impl.okhttp.PartIterator;
import com.tngtech.archunit.core.domain.JavaClasses;
import com.tngtech.archunit.core.importer.ClassFileImporter;
import com.tngtech.archunit.core.importer.ImportOption;
import com.tngtech.archunit.lang.ArchRule;
import jakarta.mail.util.ByteArrayDataSource;
import org.junit.jupiter.api.Test;

import static com.tngtech.archunit.lang.syntax.ArchRuleDefinition.noClasses;

/**
* Architecture tests to ensure streaming best practices are followed.
* <p>
* ByteArrayDataSource defeats streaming by loading entire InputStreams into memory.
* We use InputStreamDataSource instead to enable true streaming for large documents.
*/
class VerifyByteArrayDataSourceIsNotUsedTest {

private final JavaClasses classes = new ClassFileImporter()
.withImportOption(ImportOption.Predefined.DO_NOT_INCLUDE_TESTS)
.importPackages("com.marklogic.client");

@Test
void shouldNotUseByteArrayDataSourceInProduction() {
ArchRule rule = noClasses()
// PartIterator can use ByteArrayDataSource because for an eval/invoke use case, it's very likely the user
// wants the results in memory so they can perform some operation on them. If this proves false in the
// future, PartIterator can be adjusted to use InputStreamDataSource instead.
.that().doNotHaveSimpleName(PartIterator.class.getSimpleName())

.should().dependOnClassesThat().haveSimpleName(ByteArrayDataSource.class.getSimpleName())
.because("MLE-27509 identifies a problem where a multipart response was having each part " +
"processed with the ByteArrayDataSource, which loads the entire stream into memory. This is " +
"surprising to any user using InputStreamHandle to access the content of a document, as that user " +
"is likely expecting to stream document from MarkLogic to some target. The InputStreamDataSource " +
"class was created to avoiding reading the contents of the document into an in-memory byte " +
"array, thus allowing for streaming reads to occur via a multipart response.");
Comment on lines +39 to +42
Copy link

Copilot AI Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ArchUnit failure message has a couple grammatical issues that make it harder to read: "expecting to stream document" is missing an article, and "was created to avoiding" should be "was created to avoid".

Copilot uses AI. Check for mistakes.

rule.check(classes);
}
}