Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion bson/src/main/org/bson/BsonDocument.java
Original file line number Diff line number Diff line change
Expand Up @@ -921,10 +921,12 @@ private static class SerializationProxy implements Serializable {
new BsonDocumentCodec().encode(new BsonBinaryWriter(buffer), document, EncoderContext.builder().build());
this.bytes = new byte[buffer.size()];
int curPos = 0;
for (ByteBuf cur : buffer.getByteBuffers()) {
List<ByteBuf> byteBuffers = buffer.getByteBuffers();
for (ByteBuf cur : byteBuffers) {
System.arraycopy(cur.array(), cur.position(), bytes, curPos, cur.limit());
curPos += cur.position();
Comment thread
nhachicha marked this conversation as resolved.
}
byteBuffers.forEach(ByteBuf::release);
}
Comment thread
rozza marked this conversation as resolved.

private Object readResolve() {
Expand Down
6 changes: 6 additions & 0 deletions config/spotbugs/exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -290,4 +290,10 @@
<Bug pattern="NM_CLASS_NAMING_CONVENTION"/>
</Match>

<!-- DefaultServerMonitor -->
<Match>
<class name="com.mongodb.internal.connection.DefaultServerMonitor" />
<Bug pattern="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE" />
</Match>
Comment thread
rozza marked this conversation as resolved.

</FindBugsFilter>
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package com.mongodb.internal.connection;

import com.mongodb.annotations.Sealed;
import com.mongodb.internal.ResourceUtil;
import com.mongodb.internal.VisibleForTesting;
import org.bson.BsonSerializationException;
import org.bson.ByteBuf;
import org.bson.io.OutputBuffer;
Expand All @@ -28,11 +31,28 @@

import static com.mongodb.assertions.Assertions.assertTrue;
import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE;
import static java.lang.String.format;

/**
* A BSON output implementation that uses pooled {@link ByteBuf} instances for efficient memory management.
*
* <h2>ByteBuf Ownership and Lifecycle</h2>
* <p>This class manages the lifecycle of {@link ByteBuf} instances obtained from the {@link BufferProvider}.
* The ownership model is as follows:</p>
* <ul>
* <li>Internal buffers are owned by this output and released when {@link #close()} is called or
* when {@link #truncateToPosition(int)} removes them.</li>
* <li>Methods that return {@link ByteBuf} instances (e.g., {@link #getByteBuffers()}) return
* duplicates with their own reference counts. <strong>Callers are responsible for releasing
* these buffers</strong> to prevent memory leaks.</li>
* <li>The {@link Branch} subclass merges its buffers into the parent on close, transferring
* ownership by retaining buffers before the branch releases them.</li>
* </ul>
*
* <p>This class is not part of the public API and may be removed or changed at any time</p>
*/
@Sealed
public class ByteBufferBsonOutput extends OutputBuffer {

private static final int MAX_SHIFT = 31;
Expand All @@ -50,6 +70,9 @@ public class ByteBufferBsonOutput extends OutputBuffer {
/**
* Construct an instance that uses the given buffer provider to allocate byte buffers as needs as it grows.
*
* <p>The buffer provider is used to allocate new {@link ByteBuf} instances as the output grows.
* All allocated buffers are owned by this output and will be released when {@link #close()} is called.</p>
*
* @param bufferProvider the non-null buffer provider
*/
public ByteBufferBsonOutput(final BufferProvider bufferProvider) {
Expand All @@ -63,6 +86,10 @@ public ByteBufferBsonOutput(final BufferProvider bufferProvider) {
* If multiple branches are created, they are merged in the order they are {@linkplain ByteBufferBsonOutput.Branch#close() closed}.
* {@linkplain #close() Closing} this {@link ByteBufferBsonOutput} does not {@linkplain ByteBufferBsonOutput.Branch#close() close} the branch.
*
* <p><strong>ByteBuf Ownership:</strong> The branch allocates its own buffers. When the branch is closed,
* ownership of these buffers is transferred to the parent by retaining them before the branch releases
* its references. The parent then becomes responsible for releasing these buffers when it is closed.</p>
*
* @return A new {@link ByteBufferBsonOutput.Branch}.
*/
public ByteBufferBsonOutput.Branch branch() {
Expand Down Expand Up @@ -223,17 +250,46 @@ protected void write(final int absolutePosition, final int value) {
byteBuffer.put(bufferPositionPair.position++, (byte) value);
}

/**
* Returns a list of duplicated byte buffers containing the written data, flipped for reading.
*
* <p><strong>ByteBuf Ownership:</strong> The returned buffers are duplicates with their own
* reference counts (each starts with a reference count of 1). <strong>The caller is responsible
* for releasing each buffer</strong> when done to prevent memory leaks. Example usage:</p>
* <pre>{@code
* List<ByteBuf> buffers = output.getByteBuffers();
* try {
* // use buffers
* } finally {
* ResourceUtil.release(buffers);
* }
* }</pre>
* <p><strong>Note:</strong> These buffers must be released before this {@code ByteBufferBsonOutput} is closed.
* Otherwise there is a risk of the buffers being released back to the bufferProvider and data corruption.</p>
*
Comment thread
rozza marked this conversation as resolved.
* @return a list of duplicated buffers, flipped for reading
*/
@Override
public List<ByteBuf> getByteBuffers() {
ensureOpen();

List<ByteBuf> buffers = new ArrayList<>(bufferList.size());
for (final ByteBuf cur : bufferList) {
buffers.add(cur.duplicate().order(ByteOrder.LITTLE_ENDIAN).flip());
}
return buffers;
}

/**
* Returns a list of duplicated byte buffers without flipping them.
*
* <p><strong>ByteBuf Ownership:</strong> The returned buffers are duplicates with their own
* reference counts (each starts with a reference count of 1). <strong>The caller is responsible
* for releasing each buffer</strong> when done to prevent memory leaks.</p>
*
* @return a list of duplicated buffers
* @see #getByteBuffers()
*/
@VisibleForTesting(otherwise = PRIVATE)
public List<ByteBuf> getDuplicateByteBuffers() {
ensureOpen();

Expand All @@ -245,6 +301,13 @@ public List<ByteBuf> getDuplicateByteBuffers() {
}


/**
* {@inheritDoc}
*
* <p><strong>ByteBuf Management:</strong> This method obtains duplicated buffers via
* {@link #getByteBuffers()} and releases them after writing to the output stream,
* ensuring no buffer leaks occur.</p>
*/
@Override
public int pipe(final OutputStream out) throws IOException {
ensureOpen();
Expand All @@ -263,11 +326,20 @@ public int pipe(final OutputStream out) throws IOException {
total += cur.limit();
}
} finally {
byteBuffers.forEach(ByteBuf::release);
ResourceUtil.release(byteBuffers);
}
return total;
}

/**
* Truncates this output to the specified position, releasing any buffers that are no longer needed.
*
* <p><strong>ByteBuf Management:</strong> Any buffers beyond the new position are removed from
* the internal buffer list and released. This ensures no memory leaks when truncating.</p>
*
* @param newPosition the new position to truncate to
* @throws IllegalArgumentException if newPosition is negative or greater than the current position
*/
@Override
public void truncateToPosition(final int newPosition) {
ensureOpen();
Expand Down Expand Up @@ -306,13 +378,15 @@ public final void flush() throws IOException {
* {@inheritDoc}
* <p>
* Idempotent.</p>
*
* <p><strong>ByteBuf Management:</strong> Releases internal buffers and clears the buffer list.
* After this method returns, all buffers that were allocated by this output will have been fully released
* back to the buffer provider.</p>
*/
@Override
public void close() {
if (isOpen()) {
for (final ByteBuf cur : bufferList) {
cur.release();
}
ResourceUtil.release(bufferList);
currentByteBuffer = null;
bufferList.clear();
closed = true;
Expand Down Expand Up @@ -345,7 +419,14 @@ boolean isOpen() {
}

/**
* @see #branch()
* Merges a branch's buffers into this output.
*
* <p><strong>ByteBuf Ownership:</strong> This method retains each buffer from the branch before
* adding it to this output's buffer list. This is necessary because the branch will release its
* references when it closes. The retain ensures the buffers remain valid and are now owned by
* this output.</p>
*
* @param branch the branch to merge
*/
private void merge(final ByteBufferBsonOutput branch) {
assertTrue(branch instanceof ByteBufferBsonOutput.Branch);
Expand All @@ -356,6 +437,20 @@ private void merge(final ByteBufferBsonOutput branch) {
currentByteBuffer = null;
}

/**
* A branch of a {@link ByteBufferBsonOutput} that can be merged back into its parent.
*
* <p><strong>ByteBuf Ownership:</strong> A branch allocates its own buffers independently.
* When {@link #close()} is called:</p>
* <ol>
* <li>The parent's {@link ByteBufferBsonOutput#merge(ByteBufferBsonOutput)} method is called,
* which retains all buffers in this branch.</li>
* <li>Then {@code super.close()} is called, which releases the branch's references to the buffers.</li>
* </ol>
* <p>The retain/release sequence ensures buffers are safely transferred to the parent without leaks.</p>
*
* @see #branch()
*/
public static final class Branch extends ByteBufferBsonOutput {
private final ByteBufferBsonOutput parent;

Expand All @@ -365,6 +460,16 @@ private Branch(final ByteBufferBsonOutput parent) {
}

/**
* Closes this branch and merges its data into the parent output.
*
* <p><strong>ByteBuf Ownership:</strong> On close, this branch's buffers are transferred
* to the parent. The parent retains the buffers (incrementing reference counts), and then
* this branch releases only its own single reference. The parent
* becomes the sole owner of the buffers and is responsible for releasing them.</p>
*
* <p>Idempotent. If already closed, this method does nothing.</p>
*
* @throws AssertionError if the parent has been closed before this branch
* @see #branch()
*/
@Override
Expand Down
Loading