Skip to content

Commit 98a264b

Browse files
authored
ByteBufBsonDocument & ByteBufBsonArray refactorings (#1874)
* ByteBufBsonDocument & ByteBufBsonArray refactoring * Now implement `Closeable` to track and manage lifecycle with try-with-resources * `ByteBufBsonDocument`: Added resource tracking, OP_MSG parsing, caching strategy * `ByteBufBsonArray`: Added resource tracking and cleanup CommandMessage Changes: * `getCommandDocument()` returns `ByteBufBsonDocument` (was `BsonDocument`) * Delegates document composition to `ByteBufBsonDocument` * Simplified `OP_MSG` document sequence parsing JAVA-6010 * Nit fixes and usability improvements If the document is hydrated allow continued use after resource closing. * PR updates * PR updates - ensure iterators track open resources and normalize the tests
1 parent 5560012 commit 98a264b

12 files changed

Lines changed: 2564 additions & 1197 deletions

driver-core/src/main/com/mongodb/internal/connection/ByteBufBsonArray.java

Lines changed: 105 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,17 @@
1616

1717
package com.mongodb.internal.connection;
1818

19+
import com.mongodb.annotations.NotThreadSafe;
20+
import com.mongodb.internal.diagnostics.logging.Logger;
21+
import com.mongodb.internal.diagnostics.logging.Loggers;
1922
import org.bson.BsonArray;
2023
import org.bson.BsonBinaryReader;
2124
import org.bson.BsonType;
2225
import org.bson.BsonValue;
2326
import org.bson.ByteBuf;
2427
import org.bson.io.ByteBufferBsonInput;
2528

29+
import java.io.Closeable;
2630
import java.util.ArrayList;
2731
import java.util.Collection;
2832
import java.util.Iterator;
@@ -33,20 +37,33 @@
3337

3438
import static com.mongodb.internal.connection.ByteBufBsonHelper.readBsonValue;
3539

36-
final class ByteBufBsonArray extends BsonArray {
40+
@NotThreadSafe
41+
final class ByteBufBsonArray extends BsonArray implements Closeable {
42+
private static final Logger LOGGER = Loggers.getLogger("connection");
3743
private final ByteBuf byteBuf;
3844

45+
/**
46+
* List of resources that need to be closed when this array is closed.
47+
* Tracks the main ByteBuf and iterator duplicates. Iterator buffers are automatically
48+
* removed and released when iteration completes normally to prevent memory accumulation.
49+
*/
50+
private final List<Closeable> trackedResources = new ArrayList<>();
51+
private boolean closed;
52+
3953
ByteBufBsonArray(final ByteBuf byteBuf) {
4054
this.byteBuf = byteBuf;
55+
trackedResources.add(byteBuf::release);
4156
}
4257

4358
@Override
4459
public Iterator<BsonValue> iterator() {
60+
ensureOpen();
4561
return new ByteBufBsonArrayIterator();
4662
}
4763

4864
@Override
4965
public List<BsonValue> getValues() {
66+
ensureOpen();
5067
List<BsonValue> values = new ArrayList<>();
5168
for (BsonValue cur: this) {
5269
//noinspection UseBulkOperation
@@ -59,6 +76,7 @@ public List<BsonValue> getValues() {
5976

6077
@Override
6178
public int size() {
79+
ensureOpen();
6280
int size = 0;
6381
for (BsonValue ignored : this) {
6482
size++;
@@ -68,11 +86,13 @@ public int size() {
6886

6987
@Override
7088
public boolean isEmpty() {
89+
ensureOpen();
7190
return !iterator().hasNext();
7291
}
7392

7493
@Override
7594
public boolean equals(final Object o) {
95+
ensureOpen();
7696
if (o == this) {
7797
return true;
7898
}
@@ -91,6 +111,7 @@ public boolean equals(final Object o) {
91111

92112
@Override
93113
public int hashCode() {
114+
ensureOpen();
94115
int hashCode = 1;
95116
for (BsonValue cur : this) {
96117
hashCode = 31 * hashCode + (cur == null ? 0 : cur.hashCode());
@@ -100,6 +121,7 @@ public int hashCode() {
100121

101122
@Override
102123
public boolean contains(final Object o) {
124+
ensureOpen();
103125
for (BsonValue cur : this) {
104126
if (Objects.equals(o, cur)) {
105127
return true;
@@ -111,6 +133,7 @@ public boolean contains(final Object o) {
111133

112134
@Override
113135
public Object[] toArray() {
136+
ensureOpen();
114137
Object[] retVal = new Object[size()];
115138
Iterator<BsonValue> it = iterator();
116139
for (int i = 0; i < retVal.length; i++) {
@@ -122,6 +145,7 @@ public Object[] toArray() {
122145
@Override
123146
@SuppressWarnings("unchecked")
124147
public <T> T[] toArray(final T[] a) {
148+
ensureOpen();
125149
int size = size();
126150
T[] retVal = a.length >= size ? a : (T[]) java.lang.reflect.Array.newInstance(a.getClass().getComponentType(), size);
127151
Iterator<BsonValue> it = iterator();
@@ -133,6 +157,7 @@ public <T> T[] toArray(final T[] a) {
133157

134158
@Override
135159
public boolean containsAll(final Collection<?> c) {
160+
ensureOpen();
136161
for (Object e : c) {
137162
if (!contains(e)) {
138163
return false;
@@ -143,6 +168,7 @@ public boolean containsAll(final Collection<?> c) {
143168

144169
@Override
145170
public BsonValue get(final int index) {
171+
ensureOpen();
146172
if (index < 0) {
147173
throw new IndexOutOfBoundsException("Index out of range: " + index);
148174
}
@@ -159,6 +185,7 @@ public BsonValue get(final int index) {
159185

160186
@Override
161187
public int indexOf(final Object o) {
188+
ensureOpen();
162189
int i = 0;
163190
for (BsonValue cur : this) {
164191
if (Objects.equals(o, cur)) {
@@ -172,6 +199,7 @@ public int indexOf(final Object o) {
172199

173200
@Override
174201
public int lastIndexOf(final Object o) {
202+
ensureOpen();
175203
ListIterator<BsonValue> listIterator = listIterator(size());
176204
while (listIterator.hasPrevious()) {
177205
if (Objects.equals(o, listIterator.previous())) {
@@ -183,17 +211,20 @@ public int lastIndexOf(final Object o) {
183211

184212
@Override
185213
public ListIterator<BsonValue> listIterator() {
214+
ensureOpen();
186215
return listIterator(0);
187216
}
188217

189218
@Override
190219
public ListIterator<BsonValue> listIterator(final int index) {
220+
ensureOpen();
191221
// Not the most efficient way to do this, but unlikely anyone will notice in practice
192222
return new ArrayList<>(this).listIterator(index);
193223
}
194224

195225
@Override
196226
public List<BsonValue> subList(final int fromIndex, final int toIndex) {
227+
ensureOpen();
197228
if (fromIndex < 0) {
198229
throw new IndexOutOfBoundsException("fromIndex = " + fromIndex);
199230
}
@@ -234,6 +265,7 @@ public boolean addAll(final Collection<? extends BsonValue> c) {
234265

235266
@Override
236267
public boolean addAll(final int index, final Collection<? extends BsonValue> c) {
268+
ensureOpen();
237269
throw new UnsupportedOperationException(READ_ONLY_MESSAGE);
238270
}
239271

@@ -267,34 +299,97 @@ public BsonValue remove(final int index) {
267299
throw new UnsupportedOperationException(READ_ONLY_MESSAGE);
268300
}
269301

302+
@Override
303+
public void close(){
304+
if (!closed) {
305+
for (Closeable closeable : trackedResources) {
306+
try {
307+
closeable.close();
308+
} catch (Exception e) {
309+
// Log and continue closing other resources
310+
LOGGER.error("Error closing resource", e);
311+
}
312+
}
313+
trackedResources.clear();
314+
closed = true;
315+
}
316+
}
317+
318+
private void ensureOpen() {
319+
if (closed) {
320+
throw new IllegalStateException("The BsonArray resources have been released.");
321+
}
322+
}
323+
270324
private class ByteBufBsonArrayIterator implements Iterator<BsonValue> {
271-
private final ByteBuf duplicatedByteBuf = byteBuf.duplicate();
272-
private final BsonBinaryReader bsonReader;
325+
private ByteBuf duplicatedByteBuf;
326+
private BsonBinaryReader reader;
327+
private Closeable resourceHandle;
328+
private boolean finished;
273329

274330
{
275-
bsonReader = new BsonBinaryReader(new ByteBufferBsonInput(duplicatedByteBuf));
331+
ensureOpen();
332+
// Create duplicate buffer for iteration and track it temporarily
333+
duplicatedByteBuf = byteBuf.duplicate();
334+
resourceHandle = () -> {
335+
if (duplicatedByteBuf != null) {
336+
try {
337+
if (reader != null) {
338+
reader.close();
339+
}
340+
} catch (Exception e) {
341+
// Ignore
342+
}
343+
duplicatedByteBuf.release();
344+
duplicatedByteBuf = null;
345+
reader = null;
346+
}
347+
};
348+
trackedResources.add(resourceHandle);
349+
reader = new BsonBinaryReader(new ByteBufferBsonInput(duplicatedByteBuf));
276350
// While one might expect that this would be a call to BsonReader#readStartArray that doesn't work because BsonBinaryReader
277351
// expects to be positioned at the start at the beginning of a document, not an array. Fortunately, a BSON array has exactly
278352
// the same structure as a BSON document (the keys are just the array indices converted to a strings). So it works fine to
279353
// call BsonReader#readStartDocument here, and just skip all the names via BsonReader#skipName below.
280-
bsonReader.readStartDocument();
281-
bsonReader.readBsonType();
354+
reader.readStartDocument();
355+
reader.readBsonType();
282356
}
283357

284358
@Override
285359
public boolean hasNext() {
286-
return bsonReader.getCurrentBsonType() != BsonType.END_OF_DOCUMENT;
360+
if (finished) {
361+
return false;
362+
}
363+
ensureOpen();
364+
boolean hasNext = reader.getCurrentBsonType() != BsonType.END_OF_DOCUMENT;
365+
if (!hasNext) {
366+
cleanup();
367+
}
368+
return hasNext;
287369
}
288370

289371
@Override
290372
public BsonValue next() {
291373
if (!hasNext()) {
292374
throw new NoSuchElementException();
293375
}
294-
bsonReader.skipName();
295-
BsonValue value = readBsonValue(duplicatedByteBuf, bsonReader);
296-
bsonReader.readBsonType();
376+
reader.skipName();
377+
BsonValue value = readBsonValue(duplicatedByteBuf, reader, trackedResources);
378+
reader.readBsonType();
297379
return value;
298380
}
381+
382+
private void cleanup() {
383+
if (!finished) {
384+
finished = true;
385+
// Remove from tracked resources since we're cleaning up immediately
386+
trackedResources.remove(resourceHandle);
387+
try {
388+
resourceHandle.close();
389+
} catch (Exception e) {
390+
// Ignore
391+
}
392+
}
393+
}
299394
}
300395
}

0 commit comments

Comments
 (0)