From c593e680c90bab95bac636f2af8202b24741f54d Mon Sep 17 00:00:00 2001 From: Anton Borisov Date: Sat, 4 Apr 2026 18:44:16 +0100 Subject: [PATCH] Fix buffer leak in LimitBatchScanner --- .../scanner/batch/LimitBatchScanner.java | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/LimitBatchScanner.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/LimitBatchScanner.java index f4a3be53f6..320474b754 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/LimitBatchScanner.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/LimitBatchScanner.java @@ -37,6 +37,7 @@ import org.apache.fluss.rpc.gateway.TabletServerGateway; import org.apache.fluss.rpc.messages.LimitScanRequest; import org.apache.fluss.rpc.messages.LimitScanResponse; +import org.apache.fluss.shaded.netty4.io.netty.buffer.ByteBuf; import org.apache.fluss.types.RowType; import org.apache.fluss.utils.CloseableIterator; import org.apache.fluss.utils.SchemaUtil; @@ -124,17 +125,28 @@ public CloseableIterator pollBatch(Duration timeout) throws IOExcep if (endOfInput) { return null; } + LimitScanResponse response; try { - LimitScanResponse response = scanFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS); - List scanRows = parseLimitScanResponse(response); - endOfInput = true; - return CloseableIterator.wrap(scanRows.iterator()); + response = scanFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS); } catch (TimeoutException e) { // poll next time return CloseableIterator.emptyIterator(); } catch (Exception e) { throw new IOException(e); } + + ByteBuf parsedByteBuf = response.getParsedByteBuf(); + try { + List scanRows = parseLimitScanResponse(response); + endOfInput = true; + return CloseableIterator.wrap(scanRows.iterator()); + } catch (Exception e) { + throw new IOException(e); + } finally { + if (parsedByteBuf != null) { + parsedByteBuf.release(); + } + } } private List parseLimitScanResponse(LimitScanResponse limitScanResponse) {